You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2021/07/14 07:19:00 UTC

[iotdb] branch to0.12calendarbug created (now ed3c96d)

This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a change to branch to0.12calendarbug
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


      at ed3c96d  modify tests

This branch includes the following new commits:

     new f71c72e  Cherry pick session manager and fix conflicts
     new 7872128  cherry pick fix calendar time zone
     new 2a47671  Fix bugs
     new ed3c96d  modify tests

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[iotdb] 03/04: Fix bugs

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch to0.12calendarbug
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 2a476712916213d9288bf8a95a2b9827c44b5e3c
Author: Alima777 <wx...@gmail.com>
AuthorDate: Tue Jul 13 15:07:57 2021 +0800

    Fix bugs
---
 .../java/org/apache/iotdb/db/query/control/SessionManager.java     | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
index fde037f..548b543 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
@@ -68,7 +68,12 @@ public class SessionManager {
   }
 
   public TimeZone getCurrSessionTimeZone() {
-    return TimeZone.getTimeZone(SessionManager.getInstance().getZoneId(getCurrSessionId()));
+    if (getCurrSessionId() != null) {
+      return TimeZone.getTimeZone(SessionManager.getInstance().getZoneId(getCurrSessionId()));
+    } else {
+      // only used for test
+      return TimeZone.getDefault();
+    }
   }
 
   public long requestSessionId(String username, String zoneId) {

[iotdb] 04/04: modify tests

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch to0.12calendarbug
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit ed3c96d1f7f3f09c20655e7f104769b81734303b
Author: Alima777 <wx...@gmail.com>
AuthorDate: Tue Jul 13 21:24:58 2021 -0600

    modify tests
---
 .../main/java/org/apache/iotdb/db/query/control/SessionManager.java | 2 +-
 .../java/org/apache/iotdb/db/integration/IoTDBGroupByMonthIT.java   | 6 ++++++
 2 files changed, 7 insertions(+), 1 deletion(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
index 548b543..91b37da 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
@@ -72,7 +72,7 @@ public class SessionManager {
       return TimeZone.getTimeZone(SessionManager.getInstance().getZoneId(getCurrSessionId()));
     } else {
       // only used for test
-      return TimeZone.getDefault();
+      return TimeZone.getTimeZone("+08:00");
     }
   }
 
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBGroupByMonthIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBGroupByMonthIT.java
index 76e0dba..6292967 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBGroupByMonthIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBGroupByMonthIT.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.jdbc.Config;
+import org.apache.iotdb.jdbc.IoTDBConnection;
 
 import org.junit.After;
 import org.junit.Assert;
@@ -81,6 +82,7 @@ public class IoTDBGroupByMonthIT {
         "02/28/2021:00:00:00", "1.0"
       };
 
+      ((IoTDBConnection) connection).setTimeZone("GMT+00:00");
       boolean hasResultSet =
           statement.execute(
               "select sum(temperature) from root.sg1.d1 "
@@ -121,6 +123,7 @@ public class IoTDBGroupByMonthIT {
         "02/28/2021:00:00:00", "1.0"
       };
 
+      ((IoTDBConnection) connection).setTimeZone("GMT+00:00");
       boolean hasResultSet =
           statement.execute(
               "select sum(temperature) from root.sg1.d1 "
@@ -153,6 +156,7 @@ public class IoTDBGroupByMonthIT {
             DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
         Statement statement = connection.createStatement()) {
 
+      ((IoTDBConnection) connection).setTimeZone("GMT+00:00");
       boolean hasResultSet =
           statement.execute(
               "select sum(temperature) from root.sg1.d1 "
@@ -186,6 +190,7 @@ public class IoTDBGroupByMonthIT {
         "02/28/2021:00:00:00", "31.0"
       };
 
+      ((IoTDBConnection) connection).setTimeZone("GMT+00:00");
       boolean hasResultSet =
           statement.execute(
               "select sum(temperature) from root.sg1.d1 GROUP BY ([1612051200000, 1617148800000), 1mo)");
@@ -231,6 +236,7 @@ public class IoTDBGroupByMonthIT {
             DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
         Statement statement = connection.createStatement()) {
 
+      ((IoTDBConnection) connection).setTimeZone("GMT+00:00");
       boolean hasResultSet =
           statement.execute(
               "select sum(temperature) from root.sg1.d1 GROUP BY ([now() - 1mo, now()), 1d)");

[iotdb] 02/04: cherry pick fix calendar time zone

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch to0.12calendarbug
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 7872128b969054c3afab00008f9914d0e8b9845c
Author: Alima777 <wx...@gmail.com>
AuthorDate: Tue Jul 13 05:33:14 2021 +0200

    cherry pick fix calendar time zone
---
 .../apache/iotdb/db/qp/utils/DatetimeUtils.java    |  2 ++
 .../iotdb/db/query/control/SessionManager.java     | 16 ++++++++++
 .../dataset/groupby/GroupByEngineDataSet.java      |  2 ++
 .../iotdb/db/query/executor/QueryRouter.java       |  4 ++-
 .../org/apache/iotdb/db/service/TSServiceImpl.java | 37 ++++++++++------------
 .../tsfile/read/filter/GroupByMonthFilter.java     |  6 +++-
 .../tsfile/read/filter/GroupByMonthFilterTest.java | 26 ++++++++++-----
 7 files changed, 63 insertions(+), 30 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/qp/utils/DatetimeUtils.java b/server/src/main/java/org/apache/iotdb/db/qp/utils/DatetimeUtils.java
index 64698ef..1f5a425 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/utils/DatetimeUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/utils/DatetimeUtils.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.qp.utils;
 
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.query.LogicalOperatorException;
+import org.apache.iotdb.db.query.control.SessionManager;
 
 import java.time.DateTimeException;
 import java.time.Instant;
@@ -570,6 +571,7 @@ public class DatetimeUtils {
           res *= 30 * 86_400_000L;
         } else {
           Calendar calendar = Calendar.getInstance();
+          calendar.setTimeZone(SessionManager.getInstance().getCurrSessionTimeZone());
           calendar.setTimeInMillis(currentTime);
           calendar.add(Calendar.MONTH, (int) (value));
           res = calendar.getTimeInMillis() - currentTime;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
index a2a253f..fde037f 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
@@ -29,6 +29,7 @@ import java.time.ZoneId;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
+import java.util.TimeZone;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.atomic.AtomicLong;
@@ -36,6 +37,8 @@ import java.util.concurrent.atomic.AtomicLong;
 public class SessionManager {
   private static final Logger LOGGER = LoggerFactory.getLogger(SessionManager.class);
 
+  // When the client abnormally exits, we can still know who to disconnect
+  private final ThreadLocal<Long> currSessionId = new ThreadLocal<>();
   // Record the username for every rpc connection (session).
   private final Map<Long, String> sessionIdToUsername = new ConcurrentHashMap<>();
   private final Map<Long, ZoneId> sessionIdToZoneId = new ConcurrentHashMap<>();
@@ -56,8 +59,21 @@ public class SessionManager {
     // singleton
   }
 
+  public Long getCurrSessionId() {
+    return currSessionId.get();
+  }
+
+  public void removeCurrSessionId() {
+    currSessionId.remove();
+  }
+
+  public TimeZone getCurrSessionTimeZone() {
+    return TimeZone.getTimeZone(SessionManager.getInstance().getZoneId(getCurrSessionId()));
+  }
+
   public long requestSessionId(String username, String zoneId) {
     long sessionId = sessionIdGenerator.incrementAndGet();
+    currSessionId.set(sessionId);
     sessionIdToUsername.put(sessionId, username);
     sessionIdToZoneId.put(sessionId, ZoneId.of(zoneId));
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByEngineDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByEngineDataSet.java
index 4127b1a..7194633 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByEngineDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByEngineDataSet.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.query.dataset.groupby;
 
 import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan;
 import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.SessionManager;
 import org.apache.iotdb.db.utils.TestOnly;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.common.RowRecord;
@@ -159,6 +160,7 @@ public abstract class GroupByEngineDataSet extends QueryDataSet {
    */
   public long calcIntervalByMonth(long numMonths) {
     Calendar calendar = Calendar.getInstance();
+    calendar.setTimeZone(SessionManager.getInstance().getCurrSessionTimeZone());
     calendar.setTimeInMillis(startTime);
     calendar.add(Calendar.MONTH, (int) (numMonths));
     return calendar.getTimeInMillis();
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java b/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java
index d7aa307..77bfd59 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.db.qp.physical.crud.LastQueryPlan;
 import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
 import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.SessionManager;
 import org.apache.iotdb.db.query.dataset.groupby.GroupByEngineDataSet;
 import org.apache.iotdb.db.query.dataset.groupby.GroupByFillDataSet;
 import org.apache.iotdb.db.query.dataset.groupby.GroupByTimeDataSet;
@@ -198,7 +199,8 @@ public class QueryRouter implements IQueryRouter {
               plan.getStartTime(),
               plan.getEndTime(),
               plan.isSlidingStepByMonth(),
-              plan.isIntervalByMonth())));
+              plan.isIntervalByMonth(),
+              SessionManager.getInstance().getCurrSessionTimeZone())));
     } else {
       return new GlobalTimeExpression(
           new GroupByFilter(
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 28c6b6d..537250c 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -186,18 +186,14 @@ public class TSServiceImpl implements TSIService.Iface {
   protected Planner processor;
   protected IPlanExecutor executor;
 
-  private SessionManager sessionManager = SessionManager.getInstance();
-
-  // When the client abnormally exits, we can still know who to disconnect
-  private final ThreadLocal<Long> currSessionId = new ThreadLocal<>();
+  private final SessionManager sessionManager = SessionManager.getInstance();
+  private final QueryTimeManager queryTimeManager = QueryTimeManager.getInstance();
 
   public static final TSProtocolVersion CURRENT_RPC_VERSION =
       TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3;
 
   private static final AtomicInteger queryCount = new AtomicInteger(0);
 
-  private QueryTimeManager queryTimeManager = QueryTimeManager.getInstance();
-
   public TSServiceImpl() throws QueryProcessException {
     processor = new Planner();
     executor = new PlanExecutor();
@@ -256,7 +252,6 @@ public class TSServiceImpl implements TSIService.Iface {
       tsStatus = RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Login successfully");
 
       sessionId = sessionManager.requestSessionId(req.getUsername(), req.getZoneId());
-      currSessionId.set(sessionId);
       AUDIT_LOGGER.info("User {} opens Session-{}", req.getUsername(), sessionId);
       LOGGER.info(
           "{}: Login status: {}. User : {}",
@@ -285,7 +280,7 @@ public class TSServiceImpl implements TSIService.Iface {
     long sessionId = req.getSessionId();
     AUDIT_LOGGER.info("Session-{} is closing", sessionId);
 
-    currSessionId.remove();
+    sessionManager.removeCurrSessionId();
 
     return new TSStatus(
         !sessionManager.releaseSessionResource(sessionId)
@@ -309,7 +304,7 @@ public class TSServiceImpl implements TSIService.Iface {
       AUDIT_LOGGER.debug(
           "{}: receive close operation from Session {}",
           IoTDBConstant.GLOBAL_DB_NAME,
-          currSessionId.get());
+          sessionManager.getCurrSessionId());
     }
 
     try {
@@ -695,7 +690,8 @@ public class TSServiceImpl implements TSIService.Iface {
           QueryFilterOptimizationException, MetadataException, IOException, InterruptedException,
           TException, AuthException {
     queryCount.incrementAndGet();
-    AUDIT_LOGGER.debug("Session {} execute Query: {}", currSessionId.get(), statement);
+    AUDIT_LOGGER.debug(
+        "Session {} execute Query: {}", sessionManager.getCurrSessionId(), statement);
     long startTime = System.currentTimeMillis();
     long queryId = -1;
     try {
@@ -1226,7 +1222,7 @@ public class TSServiceImpl implements TSIService.Iface {
   }
 
   protected void handleClientExit() {
-    Long sessionId = currSessionId.get();
+    Long sessionId = sessionManager.getCurrSessionId();
     if (sessionId != null) {
       TSCloseSessionReq req = new TSCloseSessionReq(sessionId);
       closeSession(req);
@@ -1280,7 +1276,7 @@ public class TSServiceImpl implements TSIService.Iface {
     if (AUDIT_LOGGER.isDebugEnabled()) {
       AUDIT_LOGGER.debug(
           "Session {} insertRecords, first device {}, first time {}",
-          currSessionId.get(),
+          sessionManager.getCurrSessionId(),
           req.deviceIds.get(0),
           req.getTimestamps().get(0));
     }
@@ -1346,7 +1342,7 @@ public class TSServiceImpl implements TSIService.Iface {
     if (AUDIT_LOGGER.isDebugEnabled()) {
       AUDIT_LOGGER.debug(
           "Session {} insertRecords, device {}, first time {}",
-          currSessionId.get(),
+          sessionManager.getCurrSessionId(),
           req.deviceId,
           req.getTimestamps().get(0));
     }
@@ -1388,7 +1384,7 @@ public class TSServiceImpl implements TSIService.Iface {
     if (AUDIT_LOGGER.isDebugEnabled()) {
       AUDIT_LOGGER.debug(
           "Session {} insertRecords, first device {}, first time {}",
-          currSessionId.get(),
+          sessionManager.getCurrSessionId(),
           req.deviceIds.get(0),
           req.getTimestamps().get(0));
     }
@@ -1494,7 +1490,7 @@ public class TSServiceImpl implements TSIService.Iface {
 
       AUDIT_LOGGER.debug(
           "Session {} insertRecord, device {}, time {}",
-          currSessionId.get(),
+          sessionManager.getCurrSessionId(),
           req.getDeviceId(),
           req.getTimestamp());
 
@@ -1522,7 +1518,7 @@ public class TSServiceImpl implements TSIService.Iface {
 
       AUDIT_LOGGER.debug(
           "Session {} insertRecord, device {}, time {}",
-          currSessionId.get(),
+          sessionManager.getCurrSessionId(),
           req.getDeviceId(),
           req.getTimestamp());
 
@@ -1693,7 +1689,8 @@ public class TSServiceImpl implements TSIService.Iface {
       }
 
       if (AUDIT_LOGGER.isDebugEnabled()) {
-        AUDIT_LOGGER.debug("Session-{} create timeseries {}", currSessionId.get(), req.getPath());
+        AUDIT_LOGGER.debug(
+            "Session-{} create timeseries {}", sessionManager.getCurrSessionId(), req.getPath());
       }
 
       CreateTimeSeriesPlan plan =
@@ -1726,7 +1723,7 @@ public class TSServiceImpl implements TSIService.Iface {
       if (AUDIT_LOGGER.isDebugEnabled()) {
         AUDIT_LOGGER.debug(
             "Session-{} create {} timeseries, the first is {}",
-            currSessionId.get(),
+            sessionManager.getCurrSessionId(),
             req.getPaths().size(),
             req.getPaths().get(0));
       }
@@ -1835,7 +1832,7 @@ public class TSServiceImpl implements TSIService.Iface {
       if (AUDIT_LOGGER.isDebugEnabled()) {
         AUDIT_LOGGER.debug(
             "Session-{} create device template {}.{}.{}.{}.{}.{}",
-            currSessionId.get(),
+            sessionManager.getCurrSessionId(),
             req.getName(),
             req.getSchemaNames(),
             req.getMeasurements(),
@@ -1893,7 +1890,7 @@ public class TSServiceImpl implements TSIService.Iface {
     if (AUDIT_LOGGER.isDebugEnabled()) {
       AUDIT_LOGGER.debug(
           "Session-{} set device template {}.{}",
-          currSessionId.get(),
+          sessionManager.getCurrSessionId(),
           req.getTemplateName(),
           req.getPrefixPath());
     }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/GroupByMonthFilter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/GroupByMonthFilter.java
index df64437..213d520 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/GroupByMonthFilter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/GroupByMonthFilter.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
 import java.util.Calendar;
 import java.util.Objects;
+import java.util.TimeZone;
 
 /**
  * GroupByMonthFilter is used to handle natural month slidingStep and interval by generating
@@ -44,9 +45,11 @@ public class GroupByMonthFilter extends GroupByFilter {
       long startTime,
       long endTime,
       boolean isSlidingStepByMonth,
-      boolean isIntervalByMonth) {
+      boolean isIntervalByMonth,
+      TimeZone timeZone) {
     super(interval, slidingStep, startTime, endTime);
     initialStartTime = startTime;
+    calendar.setTimeZone(timeZone);
     calendar.setTimeInMillis(startTime);
     this.isIntervalByMonth = isIntervalByMonth;
     this.isSlidingStepByMonth = isSlidingStepByMonth;
@@ -68,6 +71,7 @@ public class GroupByMonthFilter extends GroupByFilter {
     slidingStepsInMo = filter.slidingStepsInMo;
     initialStartTime = filter.initialStartTime;
     calendar = Calendar.getInstance();
+    calendar.setTimeZone(filter.calendar.getTimeZone());
     calendar.setTimeInMillis(filter.calendar.getTimeInMillis());
   }
 
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/read/filter/GroupByMonthFilterTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/read/filter/GroupByMonthFilterTest.java
index d816090..a374c74 100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/read/filter/GroupByMonthFilterTest.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/read/filter/GroupByMonthFilterTest.java
@@ -24,6 +24,8 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
 import org.junit.Test;
 
+import java.util.TimeZone;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
@@ -41,7 +43,8 @@ public class GroupByMonthFilterTest {
   @Test
   public void TestSatisfy1() {
     GroupByMonthFilter filter =
-        new GroupByMonthFilter(MS_TO_MONTH, 2 * MS_TO_MONTH, 0, END_TIME, true, true);
+        new GroupByMonthFilter(
+            MS_TO_MONTH, 2 * MS_TO_MONTH, 0, END_TIME, true, true, TimeZone.getTimeZone("+08:00"));
 
     // 1970-01-01 08:00:00, timezone = GMT+08:00
     assertTrue(filter.satisfy(0, null));
@@ -75,7 +78,8 @@ public class GroupByMonthFilterTest {
   @Test
   public void TestSatisfy2() {
     GroupByMonthFilter filter =
-        new GroupByMonthFilter(MS_TO_MONTH, MS_TO_MONTH, 0, END_TIME, true, true);
+        new GroupByMonthFilter(
+            MS_TO_MONTH, MS_TO_MONTH, 0, END_TIME, true, true, TimeZone.getTimeZone("+08:00"));
 
     // 1970-01-01 08:00:00, timezone = GMT+08:00
     assertTrue(filter.satisfy(0, null));
@@ -106,7 +110,8 @@ public class GroupByMonthFilterTest {
   @Test
   public void TestSatisfy3() {
     GroupByMonthFilter filter =
-        new GroupByMonthFilter(MS_TO_DAY, MS_TO_MONTH, 0, END_TIME, true, false);
+        new GroupByMonthFilter(
+            MS_TO_DAY, MS_TO_MONTH, 0, END_TIME, true, false, TimeZone.getTimeZone("+08:00"));
 
     // 1970-01-01 08:00:00, timezone = GMT+08:00
     assertTrue(filter.satisfy(0, null));
@@ -137,7 +142,8 @@ public class GroupByMonthFilterTest {
   @Test
   public void TestSatisfy4() {
     GroupByMonthFilter filter =
-        new GroupByMonthFilter(MS_TO_MONTH, MS_TO_DAY * 100, 0, END_TIME, false, true);
+        new GroupByMonthFilter(
+            MS_TO_MONTH, MS_TO_DAY * 100, 0, END_TIME, false, true, TimeZone.getTimeZone("+08:00"));
 
     // 1970-01-01 08:00:00, timezone = GMT+08:00
     assertTrue(filter.satisfy(0, null));
@@ -156,7 +162,8 @@ public class GroupByMonthFilterTest {
   @Test
   public void TestSatisfyStartEndTime() {
     GroupByMonthFilter filter =
-        new GroupByMonthFilter(MS_TO_DAY, MS_TO_MONTH, 0, END_TIME, true, false);
+        new GroupByMonthFilter(
+            MS_TO_DAY, MS_TO_MONTH, 0, END_TIME, true, false, TimeZone.getTimeZone("+08:00"));
 
     // 1970-01-01 08:00:00 - 1970-01-02 08:00:00, timezone = GMT+08:00
     Statistics statistics = new LongStatistics();
@@ -202,7 +209,8 @@ public class GroupByMonthFilterTest {
   @Test
   public void TestContainStartEndTime() {
     GroupByMonthFilter filter =
-        new GroupByMonthFilter(MS_TO_DAY, MS_TO_MONTH, 0, END_TIME, true, false);
+        new GroupByMonthFilter(
+            MS_TO_DAY, MS_TO_MONTH, 0, END_TIME, true, false, TimeZone.getTimeZone("+08:00"));
 
     // 1970-01-01 08:00:00 - 1970-01-02 08:00:00, timezone = GMT+08:00
     assertFalse(filter.containStartEndTime(0, MS_TO_DAY));
@@ -238,11 +246,13 @@ public class GroupByMonthFilterTest {
   @Test
   public void TestEquals() {
     GroupByMonthFilter filter =
-        new GroupByMonthFilter(MS_TO_DAY, MS_TO_MONTH, 0, END_TIME, true, false);
+        new GroupByMonthFilter(
+            MS_TO_DAY, MS_TO_MONTH, 0, END_TIME, true, false, TimeZone.getTimeZone("+08:00"));
     Filter filter2 = filter.copy();
     assertEquals(filter, filter2);
     GroupByMonthFilter filter3 =
-        new GroupByMonthFilter(MS_TO_MONTH, MS_TO_MONTH, 0, END_TIME, true, true);
+        new GroupByMonthFilter(
+            MS_TO_MONTH, MS_TO_MONTH, 0, END_TIME, true, true, TimeZone.getTimeZone("+08:00"));
     assertNotEquals(filter, filter3);
   }
 }

[iotdb] 01/04: Cherry pick session manager and fix conflicts

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch to0.12calendarbug
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit f71c72e8d9995001db6ce7ddcb6e5056fe10c287
Author: J.J. Liu <li...@gmail.com>
AuthorDate: Sat Jun 26 21:42:09 2021 +0800

    Cherry pick session manager and fix conflicts
---
 .../iotdb/db/query/control/SessionManager.java     | 178 +++++++++++++++++++++
 .../org/apache/iotdb/db/service/TSServiceImpl.java | 135 ++++------------
 2 files changed, 211 insertions(+), 102 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
new file mode 100644
index 0000000..a2a253f
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.query.control;
+
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.query.dataset.UDTFDataSet;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.ZoneId;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class SessionManager {
+  private static final Logger LOGGER = LoggerFactory.getLogger(SessionManager.class);
+
+  // Record the username for every rpc connection (session).
+  private final Map<Long, String> sessionIdToUsername = new ConcurrentHashMap<>();
+  private final Map<Long, ZoneId> sessionIdToZoneId = new ConcurrentHashMap<>();
+
+  // The sessionId is unique in one IoTDB instance.
+  private final AtomicLong sessionIdGenerator = new AtomicLong();
+  // The statementId is unique in one IoTDB instance.
+  private final AtomicLong statementIdGenerator = new AtomicLong();
+
+  // (sessionId -> Set(statementId))
+  private final Map<Long, Set<Long>> sessionIdToStatementId = new ConcurrentHashMap<>();
+  // (statementId -> Set(queryId))
+  private final Map<Long, Set<Long>> statementIdToQueryId = new ConcurrentHashMap<>();
+  // (queryId -> QueryDataSet)
+  private final Map<Long, QueryDataSet> queryIdToDataSet = new ConcurrentHashMap<>();
+
+  private SessionManager() {
+    // singleton
+  }
+
+  public long requestSessionId(String username, String zoneId) {
+    long sessionId = sessionIdGenerator.incrementAndGet();
+    sessionIdToUsername.put(sessionId, username);
+    sessionIdToZoneId.put(sessionId, ZoneId.of(zoneId));
+
+    return sessionId;
+  }
+
+  public boolean releaseSessionResource(long sessionId) {
+    sessionIdToZoneId.remove(sessionId);
+
+    for (long statementId :
+        sessionIdToStatementId.getOrDefault(sessionId, Collections.emptySet())) {
+      for (long queryId : statementIdToQueryId.getOrDefault(statementId, Collections.emptySet())) {
+        releaseQueryResourceNoExceptions(queryId);
+      }
+    }
+
+    return sessionIdToUsername.remove(sessionId) != null;
+  }
+
+  public long requestStatementId(long sessionId) {
+    long statementId = statementIdGenerator.incrementAndGet();
+    sessionIdToStatementId
+        .computeIfAbsent(sessionId, s -> new CopyOnWriteArraySet<>())
+        .add(statementId);
+    return statementId;
+  }
+
+  public void closeStatement(long sessionId, long statementId) {
+    Set<Long> queryIdSet = statementIdToQueryId.remove(statementId);
+    if (queryIdSet != null) {
+      for (long queryId : queryIdSet) {
+        releaseQueryResourceNoExceptions(queryId);
+      }
+    }
+
+    if (sessionIdToStatementId.containsKey(sessionId)) {
+      sessionIdToStatementId.get(sessionId).remove(statementId);
+    }
+  }
+
+  public long requestQueryId(
+      Long statementId, boolean isDataQuery, int fetchSize, int deduplicatedPathNum) {
+    long queryId = requestQueryId(isDataQuery, fetchSize, deduplicatedPathNum);
+    statementIdToQueryId
+        .computeIfAbsent(statementId, k -> new CopyOnWriteArraySet<>())
+        .add(queryId);
+    return queryId;
+  }
+
+  public long requestQueryId(boolean isDataQuery, int fetchSize, int deduplicatedPathNum) {
+    return QueryResourceManager.getInstance()
+        .assignQueryId(isDataQuery, fetchSize, deduplicatedPathNum);
+  }
+
+  public void releaseQueryResource(long queryId) throws StorageEngineException {
+    QueryDataSet dataSet = queryIdToDataSet.remove(queryId);
+    if (dataSet instanceof UDTFDataSet) {
+      ((UDTFDataSet) dataSet).finalizeUDFs(queryId);
+    }
+    QueryResourceManager.getInstance().endQuery(queryId);
+  }
+
+  public void releaseQueryResourceNoExceptions(long queryId) {
+    if (queryId != -1) {
+      try {
+        releaseQueryResource(queryId);
+      } catch (Exception e) {
+        LOGGER.warn("Error occurred while releasing query resource: ", e);
+      }
+    }
+  }
+
+  public String getUsername(Long sessionId) {
+    return sessionIdToUsername.get(sessionId);
+  }
+
+  public ZoneId getZoneId(Long sessionId) {
+    return sessionIdToZoneId.get(sessionId);
+  }
+
+  public void setTimezone(Long sessionId, String zone) {
+    sessionIdToZoneId.put(sessionId, ZoneId.of(zone));
+  }
+
+  public boolean hasDataset(Long queryId) {
+    return queryIdToDataSet.containsKey(queryId);
+  }
+
+  public QueryDataSet getDataset(Long queryId) {
+    return queryIdToDataSet.get(queryId);
+  }
+
+  public void setDataset(Long queryId, QueryDataSet dataSet) {
+    queryIdToDataSet.put(queryId, dataSet);
+  }
+
+  public void removeDataset(Long queryId) {
+    queryIdToDataSet.remove(queryId);
+  }
+
+  public void closeDataset(Long statementId, Long queryId) {
+    releaseQueryResourceNoExceptions(queryId);
+    if (statementIdToQueryId.containsKey(statementId)) {
+      statementIdToQueryId.get(statementId).remove(queryId);
+    }
+  }
+
+  public static SessionManager getInstance() {
+    return SessionManagerHelper.INSTANCE;
+  }
+
+  private static class SessionManagerHelper {
+
+    private static final SessionManager INSTANCE = new SessionManager();
+
+    private SessionManagerHelper() {}
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 67f0467..28c6b6d 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -72,13 +72,12 @@ import org.apache.iotdb.db.qp.physical.sys.ShowPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowQueryProcesslistPlan;
 import org.apache.iotdb.db.query.aggregation.AggregateResult;
 import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.db.query.control.QueryResourceManager;
 import org.apache.iotdb.db.query.control.QueryTimeManager;
+import org.apache.iotdb.db.query.control.SessionManager;
 import org.apache.iotdb.db.query.control.TracingManager;
 import org.apache.iotdb.db.query.dataset.AlignByDeviceDataSet;
 import org.apache.iotdb.db.query.dataset.DirectAlignByTimeDataSet;
 import org.apache.iotdb.db.query.dataset.DirectNonAlignDataSet;
-import org.apache.iotdb.db.query.dataset.UDTFDataSet;
 import org.apache.iotdb.db.tools.watermark.GroupedLSBWatermarkEncoder;
 import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
 import org.apache.iotdb.db.utils.QueryDataSetUtils;
@@ -141,19 +140,15 @@ import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Calendar;
-import java.util.Collections;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
 /** Thrift RPC implementation at server side. */
@@ -191,21 +186,7 @@ public class TSServiceImpl implements TSIService.Iface {
   protected Planner processor;
   protected IPlanExecutor executor;
 
-  // Record the username for every rpc connection (session).
-  private final Map<Long, String> sessionIdUsernameMap = new ConcurrentHashMap<>();
-  private final Map<Long, ZoneId> sessionIdZoneIdMap = new ConcurrentHashMap<>();
-
-  // The sessionId is unique in one IoTDB instance.
-  private final AtomicLong sessionIdGenerator = new AtomicLong();
-  // The statementId is unique in one IoTDB instance.
-  private final AtomicLong statementIdGenerator = new AtomicLong();
-
-  // (sessionId -> Set(statementId))
-  private final Map<Long, Set<Long>> sessionId2StatementId = new ConcurrentHashMap<>();
-  // (statementId -> Set(queryId))
-  private final Map<Long, Set<Long>> statementId2QueryId = new ConcurrentHashMap<>();
-  // (queryId -> QueryDataSet)
-  private final Map<Long, QueryDataSet> queryId2DataSet = new ConcurrentHashMap<>();
+  private SessionManager sessionManager = SessionManager.getInstance();
 
   // When the client abnormally exits, we can still know who to disconnect
   private final ThreadLocal<Long> currSessionId = new ThreadLocal<>();
@@ -273,9 +254,8 @@ public class TSServiceImpl implements TSIService.Iface {
       }
 
       tsStatus = RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Login successfully");
-      sessionId = sessionIdGenerator.incrementAndGet();
-      sessionIdUsernameMap.put(sessionId, req.getUsername());
-      sessionIdZoneIdMap.put(sessionId, ZoneId.of(req.getZoneId()));
+
+      sessionId = sessionManager.requestSessionId(req.getUsername(), req.getZoneId());
       currSessionId.set(sessionId);
       AUDIT_LOGGER.info("User {} opens Session-{}", req.getUsername(), sessionId);
       LOGGER.info(
@@ -306,16 +286,9 @@ public class TSServiceImpl implements TSIService.Iface {
     AUDIT_LOGGER.info("Session-{} is closing", sessionId);
 
     currSessionId.remove();
-    sessionIdZoneIdMap.remove(sessionId);
-
-    for (long statementId : sessionId2StatementId.getOrDefault(sessionId, Collections.emptySet())) {
-      for (long queryId : statementId2QueryId.getOrDefault(statementId, Collections.emptySet())) {
-        releaseQueryResourceNoExceptions(queryId);
-      }
-    }
 
     return new TSStatus(
-        sessionIdUsernameMap.remove(sessionId) == null
+        !sessionManager.releaseSessionResource(sessionId)
             ? RpcUtils.getStatus(TSStatusCode.NOT_LOGIN_ERROR)
             : RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
   }
@@ -340,25 +313,10 @@ public class TSServiceImpl implements TSIService.Iface {
     }
 
     try {
-      // ResultSet close
       if (req.isSetStatementId() && req.isSetQueryId()) {
-        releaseQueryResourceNoExceptions(req.queryId);
-        // clear the statementId2QueryId map
-        if (statementId2QueryId.containsKey(req.getStatementId())) {
-          statementId2QueryId.get(req.getStatementId()).remove(req.getQueryId());
-        }
+        sessionManager.closeDataset(req.statementId, req.queryId);
       } else {
-        // statement close
-        Set<Long> queryIdSet = statementId2QueryId.remove(req.getStatementId());
-        if (queryIdSet != null) {
-          for (long queryId : queryIdSet) {
-            releaseQueryResourceNoExceptions(queryId);
-          }
-        }
-        // clear the sessionId2StatementId map
-        if (sessionId2StatementId.containsKey(req.getSessionId())) {
-          sessionId2StatementId.get(req.getSessionId()).remove(req.getStatementId());
-        }
+        sessionManager.closeStatement(req.sessionId, req.statementId);
       }
       return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
     } catch (Exception e) {
@@ -369,22 +327,7 @@ public class TSServiceImpl implements TSIService.Iface {
 
   /** release single operation resource */
   protected void releaseQueryResource(long queryId) throws StorageEngineException {
-    // remove the corresponding Physical Plan
-    QueryDataSet dataSet = queryId2DataSet.remove(queryId);
-    if (dataSet instanceof UDTFDataSet) {
-      ((UDTFDataSet) dataSet).finalizeUDFs(queryId);
-    }
-    QueryResourceManager.getInstance().endQuery(queryId);
-  }
-
-  private void releaseQueryResourceNoExceptions(long queryId) {
-    if (queryId != -1) {
-      try {
-        releaseQueryResource(queryId);
-      } catch (Exception e) {
-        LOGGER.warn("Error occurred while releasing query resource: ", e);
-      }
-    }
+    sessionManager.releaseQueryResource(queryId);
   }
 
   @Override
@@ -551,7 +494,7 @@ public class TSServiceImpl implements TSIService.Iface {
       try {
         PhysicalPlan physicalPlan =
             processor.parseSQLToPhysicalPlan(
-                statement, sessionIdZoneIdMap.get(req.getSessionId()), DEFAULT_FETCH_SIZE);
+                statement, sessionManager.getZoneId(req.sessionId), DEFAULT_FETCH_SIZE);
         if (physicalPlan.isQuery()) {
           throw new QueryInBatchStatementException(statement);
         }
@@ -649,7 +592,7 @@ public class TSServiceImpl implements TSIService.Iface {
       String statement = req.getStatement();
       PhysicalPlan physicalPlan =
           processor.parseSQLToPhysicalPlan(
-              statement, sessionIdZoneIdMap.get(req.getSessionId()), req.fetchSize);
+              statement, sessionManager.getZoneId(req.getSessionId()), req.fetchSize);
 
       return physicalPlan.isQuery()
           ? internalExecuteQueryStatement(
@@ -658,7 +601,7 @@ public class TSServiceImpl implements TSIService.Iface {
               physicalPlan,
               req.fetchSize,
               req.timeout,
-              sessionIdUsernameMap.get(req.getSessionId()),
+              sessionManager.getUsername(req.getSessionId()),
               req.isEnableRedirectQuery())
           : executeUpdateStatement(physicalPlan, req.getSessionId());
     } catch (InterruptedException e) {
@@ -680,7 +623,7 @@ public class TSServiceImpl implements TSIService.Iface {
       String statement = req.getStatement();
       PhysicalPlan physicalPlan =
           processor.parseSQLToPhysicalPlan(
-              statement, sessionIdZoneIdMap.get(req.getSessionId()), req.fetchSize);
+              statement, sessionManager.getZoneId(req.sessionId), req.fetchSize);
 
       return physicalPlan.isQuery()
           ? internalExecuteQueryStatement(
@@ -689,7 +632,7 @@ public class TSServiceImpl implements TSIService.Iface {
               physicalPlan,
               req.fetchSize,
               req.timeout,
-              sessionIdUsernameMap.get(req.getSessionId()),
+              sessionManager.getUsername(req.getSessionId()),
               req.isEnableRedirectQuery())
           : RpcUtils.getTSExecuteStatementResp(
               TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is not a query statement.");
@@ -712,7 +655,7 @@ public class TSServiceImpl implements TSIService.Iface {
       }
 
       PhysicalPlan physicalPlan =
-          processor.rawDataQueryReqToPhysicalPlan(req, sessionIdZoneIdMap.get(req.getSessionId()));
+          processor.rawDataQueryReqToPhysicalPlan(req, sessionManager.getZoneId(req.sessionId));
       return physicalPlan.isQuery()
           ? internalExecuteQueryStatement(
               "",
@@ -720,7 +663,7 @@ public class TSServiceImpl implements TSIService.Iface {
               physicalPlan,
               req.fetchSize,
               config.getQueryTimeoutThreshold(),
-              sessionIdUsernameMap.get(req.getSessionId()),
+              sessionManager.getUsername(req.sessionId),
               req.isEnableRedirectQuery())
           : RpcUtils.getTSExecuteStatementResp(
               TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is not a query statement.");
@@ -762,7 +705,7 @@ public class TSServiceImpl implements TSIService.Iface {
       fetchSize = p.left;
 
       // generate the queryId for the operation
-      queryId = generateQueryId(true, fetchSize, p.right);
+      queryId = sessionManager.requestQueryId(statementId, true, fetchSize, p.right);
       // register query info to queryTimeManager
       if (!(plan instanceof ShowQueryProcesslistPlan)) {
         queryTimeManager.registerQuery(queryId, startTime, statement, timeout);
@@ -776,10 +719,6 @@ public class TSServiceImpl implements TSIService.Iface {
         }
       }
 
-      statementId2QueryId
-          .computeIfAbsent(statementId, k -> new CopyOnWriteArraySet<>())
-          .add(queryId);
-
       if (plan instanceof AuthorPlan) {
         plan.setLoginUserName(username);
       }
@@ -870,7 +809,7 @@ public class TSServiceImpl implements TSIService.Iface {
       }
       return resp;
     } catch (Exception e) {
-      releaseQueryResourceNoExceptions(queryId);
+      sessionManager.releaseQueryResourceNoExceptions(queryId);
       throw e;
     } finally {
       Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_QUERY, startTime);
@@ -1105,7 +1044,7 @@ public class TSServiceImpl implements TSIService.Iface {
         return RpcUtils.getTSFetchResultsResp(TSStatusCode.NOT_LOGIN_ERROR);
       }
 
-      if (!queryId2DataSet.containsKey(req.queryId)) {
+      if (!sessionManager.hasDataset(req.queryId)) {
         return RpcUtils.getTSFetchResultsResp(
             RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, "Has not executed query"));
       }
@@ -1114,13 +1053,14 @@ public class TSServiceImpl implements TSIService.Iface {
       queryTimeManager.registerQuery(
           req.queryId, System.currentTimeMillis(), req.statement, req.timeout);
 
-      QueryDataSet queryDataSet = queryId2DataSet.get(req.queryId);
+      QueryDataSet queryDataSet = sessionManager.getDataset(req.queryId);
       if (req.isAlign) {
         TSQueryDataSet result =
-            fillRpcReturnData(req.fetchSize, queryDataSet, sessionIdUsernameMap.get(req.sessionId));
+            fillRpcReturnData(
+                req.fetchSize, queryDataSet, sessionManager.getUsername(req.sessionId));
         boolean hasResultSet = result.bufferForTime().limit() != 0;
         if (!hasResultSet) {
-          releaseQueryResourceNoExceptions(req.queryId);
+          sessionManager.releaseQueryResourceNoExceptions(req.queryId);
         }
         TSFetchResultsResp resp = RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS);
         resp.setHasResultSet(hasResultSet);
@@ -1132,7 +1072,7 @@ public class TSServiceImpl implements TSIService.Iface {
       } else {
         TSQueryNonAlignDataSet nonAlignResult =
             fillRpcNonAlignReturnData(
-                req.fetchSize, queryDataSet, sessionIdUsernameMap.get(req.sessionId));
+                req.fetchSize, queryDataSet, sessionManager.getUsername(req.sessionId));
         boolean hasResultSet = false;
         for (ByteBuffer timeBuffer : nonAlignResult.getTimeList()) {
           if (timeBuffer.limit() != 0) {
@@ -1141,7 +1081,7 @@ public class TSServiceImpl implements TSIService.Iface {
           }
         }
         if (!hasResultSet) {
-          queryId2DataSet.remove(req.queryId);
+          sessionManager.removeDataset(req.queryId);
         }
         TSFetchResultsResp resp = RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS);
         resp.setHasResultSet(hasResultSet);
@@ -1158,7 +1098,7 @@ public class TSServiceImpl implements TSIService.Iface {
           onNPEOrUnexpectedException(
               e, "executing fetchResults", TSStatusCode.INTERNAL_SERVER_ERROR));
     } catch (Exception e) {
-      releaseQueryResourceNoExceptions(req.queryId);
+      sessionManager.releaseQueryResourceNoExceptions(req.queryId);
       return RpcUtils.getTSFetchResultsResp(
           onNPEOrUnexpectedException(
               e, "executing fetchResults", TSStatusCode.INTERNAL_SERVER_ERROR));
@@ -1210,7 +1150,7 @@ public class TSServiceImpl implements TSIService.Iface {
     QueryContext context = genQueryContext(queryId, physicalPlan.isDebug());
     QueryDataSet queryDataSet = executor.processQuery(physicalPlan, context);
     queryDataSet.setFetchSize(fetchSize);
-    queryId2DataSet.put(queryId, queryDataSet);
+    sessionManager.setDataset(queryId, queryDataSet);
     return queryDataSet;
   }
 
@@ -1239,7 +1179,7 @@ public class TSServiceImpl implements TSIService.Iface {
 
     status = executeNonQueryPlan(plan);
     TSExecuteStatementResp resp = RpcUtils.getTSExecuteStatementResp(status);
-    long queryId = generateQueryId(false, DEFAULT_FETCH_SIZE, -1);
+    long queryId = sessionManager.requestQueryId(false, DEFAULT_FETCH_SIZE, -1);
     return resp.setQueryId(queryId);
   }
 
@@ -1256,7 +1196,7 @@ public class TSServiceImpl implements TSIService.Iface {
       throws QueryProcessException {
     PhysicalPlan physicalPlan =
         processor.parseSQLToPhysicalPlan(
-            statement, sessionIdZoneIdMap.get(sessionId), DEFAULT_FETCH_SIZE);
+            statement, sessionManager.getZoneId(sessionId), DEFAULT_FETCH_SIZE);
     return physicalPlan.isQuery()
         ? RpcUtils.getTSExecuteStatementResp(
             TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is a query statement.")
@@ -1269,7 +1209,7 @@ public class TSServiceImpl implements TSIService.Iface {
    * @return true: If logged in; false: If not logged in
    */
   private boolean checkLogin(long sessionId) {
-    boolean isLoggedIn = sessionIdUsernameMap.get(sessionId) != null;
+    boolean isLoggedIn = sessionManager.getUsername(sessionId) != null;
     if (!isLoggedIn) {
       LOGGER.info(INFO_NOT_LOGIN, IoTDBConstant.GLOBAL_DB_NAME);
     }
@@ -1296,7 +1236,7 @@ public class TSServiceImpl implements TSIService.Iface {
   @Override
   public TSGetTimeZoneResp getTimeZone(long sessionId) {
     try {
-      ZoneId zoneId = sessionIdZoneIdMap.get(sessionId);
+      ZoneId zoneId = sessionManager.getZoneId(sessionId);
       return new TSGetTimeZoneResp(
           RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS),
           zoneId != null ? zoneId.toString() : "Unknown time zone");
@@ -1311,7 +1251,7 @@ public class TSServiceImpl implements TSIService.Iface {
   @Override
   public TSStatus setTimeZone(TSSetTimeZoneReq req) {
     try {
-      sessionIdZoneIdMap.put(req.getSessionId(), ZoneId.of(req.getTimeZone()));
+      sessionManager.setTimezone(req.sessionId, req.timeZone);
       return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
     } catch (Exception e) {
       return onNPEOrUnexpectedException(e, "setting time zone", TSStatusCode.SET_TIME_ZONE_ERROR);
@@ -1882,11 +1822,7 @@ public class TSServiceImpl implements TSIService.Iface {
 
   @Override
   public long requestStatementId(long sessionId) {
-    long statementId = statementIdGenerator.incrementAndGet();
-    sessionId2StatementId
-        .computeIfAbsent(sessionId, s -> new CopyOnWriteArraySet<>())
-        .add(statementId);
-    return statementId;
+    return sessionManager.requestStatementId(sessionId);
   }
 
   @Override
@@ -1971,7 +1907,7 @@ public class TSServiceImpl implements TSIService.Iface {
   private TSStatus checkAuthority(PhysicalPlan plan, long sessionId) {
     List<PartialPath> paths = plan.getPaths();
     try {
-      if (!checkAuthorization(paths, plan, sessionIdUsernameMap.get(sessionId))) {
+      if (!checkAuthorization(paths, plan, sessionManager.getUsername(sessionId))) {
         return RpcUtils.getStatus(
             TSStatusCode.NO_PERMISSION_ERROR,
             "No permissions for this operation " + plan.getOperatorType());
@@ -2000,11 +1936,6 @@ public class TSServiceImpl implements TSIService.Iface {
         : RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR);
   }
 
-  private long generateQueryId(boolean isDataQuery, int fetchSize, int deduplicatedPathNum) {
-    return QueryResourceManager.getInstance()
-        .assignQueryId(isDataQuery, fetchSize, deduplicatedPathNum);
-  }
-
   protected List<TSDataType> getSeriesTypesByPaths(
       List<PartialPath> paths, List<String> aggregations) throws MetadataException {
     return SchemaUtils.getSeriesTypesByPaths(paths, aggregations);