You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2021/07/15 02:48:40 UTC

[iotdb] branch rel/0.12 updated: [To rel/0.12] [ISSUE-3545] Bug: Time interval value is disorder in group by month (#3566)

This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.12 by this push:
     new eb8220e  [To rel/0.12] [ISSUE-3545] Bug: Time interval value is disorder in group by month (#3566)
eb8220e is described below

commit eb8220ef69fc6f38a8725b3257aedc9d00bf8bc5
Author: Xiangwei Wei <34...@users.noreply.github.com>
AuthorDate: Thu Jul 15 10:48:06 2021 +0800

    [To rel/0.12] [ISSUE-3545] Bug: Time interval value is disorder in group by month (#3566)
---
 .../apache/iotdb/db/qp/utils/DatetimeUtils.java    |   2 +
 .../iotdb/db/query/control/SessionManager.java     | 199 +++++++++++++++++++++
 .../dataset/groupby/GroupByEngineDataSet.java      |   2 +
 .../iotdb/db/query/executor/QueryRouter.java       |   4 +-
 .../org/apache/iotdb/db/service/TSServiceImpl.java | 170 +++++-------------
 .../iotdb/db/integration/IoTDBGroupByMonthIT.java  |   6 +
 .../tsfile/read/filter/GroupByMonthFilter.java     |   6 +-
 .../tsfile/read/filter/GroupByMonthFilterTest.java |  26 ++-
 8 files changed, 284 insertions(+), 131 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/qp/utils/DatetimeUtils.java b/server/src/main/java/org/apache/iotdb/db/qp/utils/DatetimeUtils.java
index 64698ef..1f5a425 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/utils/DatetimeUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/utils/DatetimeUtils.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.qp.utils;
 
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.query.LogicalOperatorException;
+import org.apache.iotdb.db.query.control.SessionManager;
 
 import java.time.DateTimeException;
 import java.time.Instant;
@@ -570,6 +571,7 @@ public class DatetimeUtils {
           res *= 30 * 86_400_000L;
         } else {
           Calendar calendar = Calendar.getInstance();
+          calendar.setTimeZone(SessionManager.getInstance().getCurrSessionTimeZone());
           calendar.setTimeInMillis(currentTime);
           calendar.add(Calendar.MONTH, (int) (value));
           res = calendar.getTimeInMillis() - currentTime;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
new file mode 100644
index 0000000..91b37da
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.query.control;
+
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.query.dataset.UDTFDataSet;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.ZoneId;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.TimeZone;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class SessionManager {
+  private static final Logger LOGGER = LoggerFactory.getLogger(SessionManager.class);
+
+  // When the client abnormally exits, we can still know who to disconnect
+  private final ThreadLocal<Long> currSessionId = new ThreadLocal<>();
+  // Record the username for every rpc connection (session).
+  private final Map<Long, String> sessionIdToUsername = new ConcurrentHashMap<>();
+  private final Map<Long, ZoneId> sessionIdToZoneId = new ConcurrentHashMap<>();
+
+  // The sessionId is unique in one IoTDB instance.
+  private final AtomicLong sessionIdGenerator = new AtomicLong();
+  // The statementId is unique in one IoTDB instance.
+  private final AtomicLong statementIdGenerator = new AtomicLong();
+
+  // (sessionId -> Set(statementId))
+  private final Map<Long, Set<Long>> sessionIdToStatementId = new ConcurrentHashMap<>();
+  // (statementId -> Set(queryId))
+  private final Map<Long, Set<Long>> statementIdToQueryId = new ConcurrentHashMap<>();
+  // (queryId -> QueryDataSet)
+  private final Map<Long, QueryDataSet> queryIdToDataSet = new ConcurrentHashMap<>();
+
+  private SessionManager() {
+    // singleton
+  }
+
+  public Long getCurrSessionId() {
+    return currSessionId.get();
+  }
+
+  public void removeCurrSessionId() {
+    currSessionId.remove();
+  }
+
+  public TimeZone getCurrSessionTimeZone() {
+    if (getCurrSessionId() != null) {
+      return TimeZone.getTimeZone(SessionManager.getInstance().getZoneId(getCurrSessionId()));
+    } else {
+      // only used for test
+      return TimeZone.getTimeZone("+08:00");
+    }
+  }
+
+  public long requestSessionId(String username, String zoneId) {
+    long sessionId = sessionIdGenerator.incrementAndGet();
+    currSessionId.set(sessionId);
+    sessionIdToUsername.put(sessionId, username);
+    sessionIdToZoneId.put(sessionId, ZoneId.of(zoneId));
+
+    return sessionId;
+  }
+
+  public boolean releaseSessionResource(long sessionId) {
+    sessionIdToZoneId.remove(sessionId);
+
+    for (long statementId :
+        sessionIdToStatementId.getOrDefault(sessionId, Collections.emptySet())) {
+      for (long queryId : statementIdToQueryId.getOrDefault(statementId, Collections.emptySet())) {
+        releaseQueryResourceNoExceptions(queryId);
+      }
+    }
+
+    return sessionIdToUsername.remove(sessionId) != null;
+  }
+
+  public long requestStatementId(long sessionId) {
+    long statementId = statementIdGenerator.incrementAndGet();
+    sessionIdToStatementId
+        .computeIfAbsent(sessionId, s -> new CopyOnWriteArraySet<>())
+        .add(statementId);
+    return statementId;
+  }
+
+  public void closeStatement(long sessionId, long statementId) {
+    Set<Long> queryIdSet = statementIdToQueryId.remove(statementId);
+    if (queryIdSet != null) {
+      for (long queryId : queryIdSet) {
+        releaseQueryResourceNoExceptions(queryId);
+      }
+    }
+
+    if (sessionIdToStatementId.containsKey(sessionId)) {
+      sessionIdToStatementId.get(sessionId).remove(statementId);
+    }
+  }
+
+  public long requestQueryId(
+      Long statementId, boolean isDataQuery, int fetchSize, int deduplicatedPathNum) {
+    long queryId = requestQueryId(isDataQuery, fetchSize, deduplicatedPathNum);
+    statementIdToQueryId
+        .computeIfAbsent(statementId, k -> new CopyOnWriteArraySet<>())
+        .add(queryId);
+    return queryId;
+  }
+
+  public long requestQueryId(boolean isDataQuery, int fetchSize, int deduplicatedPathNum) {
+    return QueryResourceManager.getInstance()
+        .assignQueryId(isDataQuery, fetchSize, deduplicatedPathNum);
+  }
+
+  public void releaseQueryResource(long queryId) throws StorageEngineException {
+    QueryDataSet dataSet = queryIdToDataSet.remove(queryId);
+    if (dataSet instanceof UDTFDataSet) {
+      ((UDTFDataSet) dataSet).finalizeUDFs(queryId);
+    }
+    QueryResourceManager.getInstance().endQuery(queryId);
+  }
+
+  public void releaseQueryResourceNoExceptions(long queryId) {
+    if (queryId != -1) {
+      try {
+        releaseQueryResource(queryId);
+      } catch (Exception e) {
+        LOGGER.warn("Error occurred while releasing query resource: ", e);
+      }
+    }
+  }
+
+  public String getUsername(Long sessionId) {
+    return sessionIdToUsername.get(sessionId);
+  }
+
+  public ZoneId getZoneId(Long sessionId) {
+    return sessionIdToZoneId.get(sessionId);
+  }
+
+  public void setTimezone(Long sessionId, String zone) {
+    sessionIdToZoneId.put(sessionId, ZoneId.of(zone));
+  }
+
+  public boolean hasDataset(Long queryId) {
+    return queryIdToDataSet.containsKey(queryId);
+  }
+
+  public QueryDataSet getDataset(Long queryId) {
+    return queryIdToDataSet.get(queryId);
+  }
+
+  public void setDataset(Long queryId, QueryDataSet dataSet) {
+    queryIdToDataSet.put(queryId, dataSet);
+  }
+
+  public void removeDataset(Long queryId) {
+    queryIdToDataSet.remove(queryId);
+  }
+
+  public void closeDataset(Long statementId, Long queryId) {
+    releaseQueryResourceNoExceptions(queryId);
+    if (statementIdToQueryId.containsKey(statementId)) {
+      statementIdToQueryId.get(statementId).remove(queryId);
+    }
+  }
+
+  public static SessionManager getInstance() {
+    return SessionManagerHelper.INSTANCE;
+  }
+
+  private static class SessionManagerHelper {
+
+    private static final SessionManager INSTANCE = new SessionManager();
+
+    private SessionManagerHelper() {}
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByEngineDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByEngineDataSet.java
index 4127b1a..7194633 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByEngineDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByEngineDataSet.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.query.dataset.groupby;
 
 import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan;
 import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.SessionManager;
 import org.apache.iotdb.db.utils.TestOnly;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.common.RowRecord;
@@ -159,6 +160,7 @@ public abstract class GroupByEngineDataSet extends QueryDataSet {
    */
   public long calcIntervalByMonth(long numMonths) {
     Calendar calendar = Calendar.getInstance();
+    calendar.setTimeZone(SessionManager.getInstance().getCurrSessionTimeZone());
     calendar.setTimeInMillis(startTime);
     calendar.add(Calendar.MONTH, (int) (numMonths));
     return calendar.getTimeInMillis();
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java b/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java
index d7aa307..77bfd59 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.db.qp.physical.crud.LastQueryPlan;
 import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
 import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.SessionManager;
 import org.apache.iotdb.db.query.dataset.groupby.GroupByEngineDataSet;
 import org.apache.iotdb.db.query.dataset.groupby.GroupByFillDataSet;
 import org.apache.iotdb.db.query.dataset.groupby.GroupByTimeDataSet;
@@ -198,7 +199,8 @@ public class QueryRouter implements IQueryRouter {
               plan.getStartTime(),
               plan.getEndTime(),
               plan.isSlidingStepByMonth(),
-              plan.isIntervalByMonth())));
+              plan.isIntervalByMonth(),
+              SessionManager.getInstance().getCurrSessionTimeZone())));
     } else {
       return new GlobalTimeExpression(
           new GroupByFilter(
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 67f0467..537250c 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -72,13 +72,12 @@ import org.apache.iotdb.db.qp.physical.sys.ShowPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowQueryProcesslistPlan;
 import org.apache.iotdb.db.query.aggregation.AggregateResult;
 import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.db.query.control.QueryResourceManager;
 import org.apache.iotdb.db.query.control.QueryTimeManager;
+import org.apache.iotdb.db.query.control.SessionManager;
 import org.apache.iotdb.db.query.control.TracingManager;
 import org.apache.iotdb.db.query.dataset.AlignByDeviceDataSet;
 import org.apache.iotdb.db.query.dataset.DirectAlignByTimeDataSet;
 import org.apache.iotdb.db.query.dataset.DirectNonAlignDataSet;
-import org.apache.iotdb.db.query.dataset.UDTFDataSet;
 import org.apache.iotdb.db.tools.watermark.GroupedLSBWatermarkEncoder;
 import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
 import org.apache.iotdb.db.utils.QueryDataSetUtils;
@@ -141,19 +140,15 @@ import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Calendar;
-import java.util.Collections;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
 /** Thrift RPC implementation at server side. */
@@ -191,32 +186,14 @@ public class TSServiceImpl implements TSIService.Iface {
   protected Planner processor;
   protected IPlanExecutor executor;
 
-  // Record the username for every rpc connection (session).
-  private final Map<Long, String> sessionIdUsernameMap = new ConcurrentHashMap<>();
-  private final Map<Long, ZoneId> sessionIdZoneIdMap = new ConcurrentHashMap<>();
-
-  // The sessionId is unique in one IoTDB instance.
-  private final AtomicLong sessionIdGenerator = new AtomicLong();
-  // The statementId is unique in one IoTDB instance.
-  private final AtomicLong statementIdGenerator = new AtomicLong();
-
-  // (sessionId -> Set(statementId))
-  private final Map<Long, Set<Long>> sessionId2StatementId = new ConcurrentHashMap<>();
-  // (statementId -> Set(queryId))
-  private final Map<Long, Set<Long>> statementId2QueryId = new ConcurrentHashMap<>();
-  // (queryId -> QueryDataSet)
-  private final Map<Long, QueryDataSet> queryId2DataSet = new ConcurrentHashMap<>();
-
-  // When the client abnormally exits, we can still know who to disconnect
-  private final ThreadLocal<Long> currSessionId = new ThreadLocal<>();
+  private final SessionManager sessionManager = SessionManager.getInstance();
+  private final QueryTimeManager queryTimeManager = QueryTimeManager.getInstance();
 
   public static final TSProtocolVersion CURRENT_RPC_VERSION =
       TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3;
 
   private static final AtomicInteger queryCount = new AtomicInteger(0);
 
-  private QueryTimeManager queryTimeManager = QueryTimeManager.getInstance();
-
   public TSServiceImpl() throws QueryProcessException {
     processor = new Planner();
     executor = new PlanExecutor();
@@ -273,10 +250,8 @@ public class TSServiceImpl implements TSIService.Iface {
       }
 
       tsStatus = RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Login successfully");
-      sessionId = sessionIdGenerator.incrementAndGet();
-      sessionIdUsernameMap.put(sessionId, req.getUsername());
-      sessionIdZoneIdMap.put(sessionId, ZoneId.of(req.getZoneId()));
-      currSessionId.set(sessionId);
+
+      sessionId = sessionManager.requestSessionId(req.getUsername(), req.getZoneId());
       AUDIT_LOGGER.info("User {} opens Session-{}", req.getUsername(), sessionId);
       LOGGER.info(
           "{}: Login status: {}. User : {}",
@@ -305,17 +280,10 @@ public class TSServiceImpl implements TSIService.Iface {
     long sessionId = req.getSessionId();
     AUDIT_LOGGER.info("Session-{} is closing", sessionId);
 
-    currSessionId.remove();
-    sessionIdZoneIdMap.remove(sessionId);
-
-    for (long statementId : sessionId2StatementId.getOrDefault(sessionId, Collections.emptySet())) {
-      for (long queryId : statementId2QueryId.getOrDefault(statementId, Collections.emptySet())) {
-        releaseQueryResourceNoExceptions(queryId);
-      }
-    }
+    sessionManager.removeCurrSessionId();
 
     return new TSStatus(
-        sessionIdUsernameMap.remove(sessionId) == null
+        !sessionManager.releaseSessionResource(sessionId)
             ? RpcUtils.getStatus(TSStatusCode.NOT_LOGIN_ERROR)
             : RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
   }
@@ -336,29 +304,14 @@ public class TSServiceImpl implements TSIService.Iface {
       AUDIT_LOGGER.debug(
           "{}: receive close operation from Session {}",
           IoTDBConstant.GLOBAL_DB_NAME,
-          currSessionId.get());
+          sessionManager.getCurrSessionId());
     }
 
     try {
-      // ResultSet close
       if (req.isSetStatementId() && req.isSetQueryId()) {
-        releaseQueryResourceNoExceptions(req.queryId);
-        // clear the statementId2QueryId map
-        if (statementId2QueryId.containsKey(req.getStatementId())) {
-          statementId2QueryId.get(req.getStatementId()).remove(req.getQueryId());
-        }
+        sessionManager.closeDataset(req.statementId, req.queryId);
       } else {
-        // statement close
-        Set<Long> queryIdSet = statementId2QueryId.remove(req.getStatementId());
-        if (queryIdSet != null) {
-          for (long queryId : queryIdSet) {
-            releaseQueryResourceNoExceptions(queryId);
-          }
-        }
-        // clear the sessionId2StatementId map
-        if (sessionId2StatementId.containsKey(req.getSessionId())) {
-          sessionId2StatementId.get(req.getSessionId()).remove(req.getStatementId());
-        }
+        sessionManager.closeStatement(req.sessionId, req.statementId);
       }
       return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
     } catch (Exception e) {
@@ -369,22 +322,7 @@ public class TSServiceImpl implements TSIService.Iface {
 
   /** release single operation resource */
   protected void releaseQueryResource(long queryId) throws StorageEngineException {
-    // remove the corresponding Physical Plan
-    QueryDataSet dataSet = queryId2DataSet.remove(queryId);
-    if (dataSet instanceof UDTFDataSet) {
-      ((UDTFDataSet) dataSet).finalizeUDFs(queryId);
-    }
-    QueryResourceManager.getInstance().endQuery(queryId);
-  }
-
-  private void releaseQueryResourceNoExceptions(long queryId) {
-    if (queryId != -1) {
-      try {
-        releaseQueryResource(queryId);
-      } catch (Exception e) {
-        LOGGER.warn("Error occurred while releasing query resource: ", e);
-      }
-    }
+    sessionManager.releaseQueryResource(queryId);
   }
 
   @Override
@@ -551,7 +489,7 @@ public class TSServiceImpl implements TSIService.Iface {
       try {
         PhysicalPlan physicalPlan =
             processor.parseSQLToPhysicalPlan(
-                statement, sessionIdZoneIdMap.get(req.getSessionId()), DEFAULT_FETCH_SIZE);
+                statement, sessionManager.getZoneId(req.sessionId), DEFAULT_FETCH_SIZE);
         if (physicalPlan.isQuery()) {
           throw new QueryInBatchStatementException(statement);
         }
@@ -649,7 +587,7 @@ public class TSServiceImpl implements TSIService.Iface {
       String statement = req.getStatement();
       PhysicalPlan physicalPlan =
           processor.parseSQLToPhysicalPlan(
-              statement, sessionIdZoneIdMap.get(req.getSessionId()), req.fetchSize);
+              statement, sessionManager.getZoneId(req.getSessionId()), req.fetchSize);
 
       return physicalPlan.isQuery()
           ? internalExecuteQueryStatement(
@@ -658,7 +596,7 @@ public class TSServiceImpl implements TSIService.Iface {
               physicalPlan,
               req.fetchSize,
               req.timeout,
-              sessionIdUsernameMap.get(req.getSessionId()),
+              sessionManager.getUsername(req.getSessionId()),
               req.isEnableRedirectQuery())
           : executeUpdateStatement(physicalPlan, req.getSessionId());
     } catch (InterruptedException e) {
@@ -680,7 +618,7 @@ public class TSServiceImpl implements TSIService.Iface {
       String statement = req.getStatement();
       PhysicalPlan physicalPlan =
           processor.parseSQLToPhysicalPlan(
-              statement, sessionIdZoneIdMap.get(req.getSessionId()), req.fetchSize);
+              statement, sessionManager.getZoneId(req.sessionId), req.fetchSize);
 
       return physicalPlan.isQuery()
           ? internalExecuteQueryStatement(
@@ -689,7 +627,7 @@ public class TSServiceImpl implements TSIService.Iface {
               physicalPlan,
               req.fetchSize,
               req.timeout,
-              sessionIdUsernameMap.get(req.getSessionId()),
+              sessionManager.getUsername(req.getSessionId()),
               req.isEnableRedirectQuery())
           : RpcUtils.getTSExecuteStatementResp(
               TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is not a query statement.");
@@ -712,7 +650,7 @@ public class TSServiceImpl implements TSIService.Iface {
       }
 
       PhysicalPlan physicalPlan =
-          processor.rawDataQueryReqToPhysicalPlan(req, sessionIdZoneIdMap.get(req.getSessionId()));
+          processor.rawDataQueryReqToPhysicalPlan(req, sessionManager.getZoneId(req.sessionId));
       return physicalPlan.isQuery()
           ? internalExecuteQueryStatement(
               "",
@@ -720,7 +658,7 @@ public class TSServiceImpl implements TSIService.Iface {
               physicalPlan,
               req.fetchSize,
               config.getQueryTimeoutThreshold(),
-              sessionIdUsernameMap.get(req.getSessionId()),
+              sessionManager.getUsername(req.sessionId),
               req.isEnableRedirectQuery())
           : RpcUtils.getTSExecuteStatementResp(
               TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is not a query statement.");
@@ -752,7 +690,8 @@ public class TSServiceImpl implements TSIService.Iface {
           QueryFilterOptimizationException, MetadataException, IOException, InterruptedException,
           TException, AuthException {
     queryCount.incrementAndGet();
-    AUDIT_LOGGER.debug("Session {} execute Query: {}", currSessionId.get(), statement);
+    AUDIT_LOGGER.debug(
+        "Session {} execute Query: {}", sessionManager.getCurrSessionId(), statement);
     long startTime = System.currentTimeMillis();
     long queryId = -1;
     try {
@@ -762,7 +701,7 @@ public class TSServiceImpl implements TSIService.Iface {
       fetchSize = p.left;
 
       // generate the queryId for the operation
-      queryId = generateQueryId(true, fetchSize, p.right);
+      queryId = sessionManager.requestQueryId(statementId, true, fetchSize, p.right);
       // register query info to queryTimeManager
       if (!(plan instanceof ShowQueryProcesslistPlan)) {
         queryTimeManager.registerQuery(queryId, startTime, statement, timeout);
@@ -776,10 +715,6 @@ public class TSServiceImpl implements TSIService.Iface {
         }
       }
 
-      statementId2QueryId
-          .computeIfAbsent(statementId, k -> new CopyOnWriteArraySet<>())
-          .add(queryId);
-
       if (plan instanceof AuthorPlan) {
         plan.setLoginUserName(username);
       }
@@ -870,7 +805,7 @@ public class TSServiceImpl implements TSIService.Iface {
       }
       return resp;
     } catch (Exception e) {
-      releaseQueryResourceNoExceptions(queryId);
+      sessionManager.releaseQueryResourceNoExceptions(queryId);
       throw e;
     } finally {
       Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_QUERY, startTime);
@@ -1105,7 +1040,7 @@ public class TSServiceImpl implements TSIService.Iface {
         return RpcUtils.getTSFetchResultsResp(TSStatusCode.NOT_LOGIN_ERROR);
       }
 
-      if (!queryId2DataSet.containsKey(req.queryId)) {
+      if (!sessionManager.hasDataset(req.queryId)) {
         return RpcUtils.getTSFetchResultsResp(
             RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, "Has not executed query"));
       }
@@ -1114,13 +1049,14 @@ public class TSServiceImpl implements TSIService.Iface {
       queryTimeManager.registerQuery(
           req.queryId, System.currentTimeMillis(), req.statement, req.timeout);
 
-      QueryDataSet queryDataSet = queryId2DataSet.get(req.queryId);
+      QueryDataSet queryDataSet = sessionManager.getDataset(req.queryId);
       if (req.isAlign) {
         TSQueryDataSet result =
-            fillRpcReturnData(req.fetchSize, queryDataSet, sessionIdUsernameMap.get(req.sessionId));
+            fillRpcReturnData(
+                req.fetchSize, queryDataSet, sessionManager.getUsername(req.sessionId));
         boolean hasResultSet = result.bufferForTime().limit() != 0;
         if (!hasResultSet) {
-          releaseQueryResourceNoExceptions(req.queryId);
+          sessionManager.releaseQueryResourceNoExceptions(req.queryId);
         }
         TSFetchResultsResp resp = RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS);
         resp.setHasResultSet(hasResultSet);
@@ -1132,7 +1068,7 @@ public class TSServiceImpl implements TSIService.Iface {
       } else {
         TSQueryNonAlignDataSet nonAlignResult =
             fillRpcNonAlignReturnData(
-                req.fetchSize, queryDataSet, sessionIdUsernameMap.get(req.sessionId));
+                req.fetchSize, queryDataSet, sessionManager.getUsername(req.sessionId));
         boolean hasResultSet = false;
         for (ByteBuffer timeBuffer : nonAlignResult.getTimeList()) {
           if (timeBuffer.limit() != 0) {
@@ -1141,7 +1077,7 @@ public class TSServiceImpl implements TSIService.Iface {
           }
         }
         if (!hasResultSet) {
-          queryId2DataSet.remove(req.queryId);
+          sessionManager.removeDataset(req.queryId);
         }
         TSFetchResultsResp resp = RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS);
         resp.setHasResultSet(hasResultSet);
@@ -1158,7 +1094,7 @@ public class TSServiceImpl implements TSIService.Iface {
           onNPEOrUnexpectedException(
               e, "executing fetchResults", TSStatusCode.INTERNAL_SERVER_ERROR));
     } catch (Exception e) {
-      releaseQueryResourceNoExceptions(req.queryId);
+      sessionManager.releaseQueryResourceNoExceptions(req.queryId);
       return RpcUtils.getTSFetchResultsResp(
           onNPEOrUnexpectedException(
               e, "executing fetchResults", TSStatusCode.INTERNAL_SERVER_ERROR));
@@ -1210,7 +1146,7 @@ public class TSServiceImpl implements TSIService.Iface {
     QueryContext context = genQueryContext(queryId, physicalPlan.isDebug());
     QueryDataSet queryDataSet = executor.processQuery(physicalPlan, context);
     queryDataSet.setFetchSize(fetchSize);
-    queryId2DataSet.put(queryId, queryDataSet);
+    sessionManager.setDataset(queryId, queryDataSet);
     return queryDataSet;
   }
 
@@ -1239,7 +1175,7 @@ public class TSServiceImpl implements TSIService.Iface {
 
     status = executeNonQueryPlan(plan);
     TSExecuteStatementResp resp = RpcUtils.getTSExecuteStatementResp(status);
-    long queryId = generateQueryId(false, DEFAULT_FETCH_SIZE, -1);
+    long queryId = sessionManager.requestQueryId(false, DEFAULT_FETCH_SIZE, -1);
     return resp.setQueryId(queryId);
   }
 
@@ -1256,7 +1192,7 @@ public class TSServiceImpl implements TSIService.Iface {
       throws QueryProcessException {
     PhysicalPlan physicalPlan =
         processor.parseSQLToPhysicalPlan(
-            statement, sessionIdZoneIdMap.get(sessionId), DEFAULT_FETCH_SIZE);
+            statement, sessionManager.getZoneId(sessionId), DEFAULT_FETCH_SIZE);
     return physicalPlan.isQuery()
         ? RpcUtils.getTSExecuteStatementResp(
             TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is a query statement.")
@@ -1269,7 +1205,7 @@ public class TSServiceImpl implements TSIService.Iface {
    * @return true: If logged in; false: If not logged in
    */
   private boolean checkLogin(long sessionId) {
-    boolean isLoggedIn = sessionIdUsernameMap.get(sessionId) != null;
+    boolean isLoggedIn = sessionManager.getUsername(sessionId) != null;
     if (!isLoggedIn) {
       LOGGER.info(INFO_NOT_LOGIN, IoTDBConstant.GLOBAL_DB_NAME);
     }
@@ -1286,7 +1222,7 @@ public class TSServiceImpl implements TSIService.Iface {
   }
 
   protected void handleClientExit() {
-    Long sessionId = currSessionId.get();
+    Long sessionId = sessionManager.getCurrSessionId();
     if (sessionId != null) {
       TSCloseSessionReq req = new TSCloseSessionReq(sessionId);
       closeSession(req);
@@ -1296,7 +1232,7 @@ public class TSServiceImpl implements TSIService.Iface {
   @Override
   public TSGetTimeZoneResp getTimeZone(long sessionId) {
     try {
-      ZoneId zoneId = sessionIdZoneIdMap.get(sessionId);
+      ZoneId zoneId = sessionManager.getZoneId(sessionId);
       return new TSGetTimeZoneResp(
           RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS),
           zoneId != null ? zoneId.toString() : "Unknown time zone");
@@ -1311,7 +1247,7 @@ public class TSServiceImpl implements TSIService.Iface {
   @Override
   public TSStatus setTimeZone(TSSetTimeZoneReq req) {
     try {
-      sessionIdZoneIdMap.put(req.getSessionId(), ZoneId.of(req.getTimeZone()));
+      sessionManager.setTimezone(req.sessionId, req.timeZone);
       return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
     } catch (Exception e) {
       return onNPEOrUnexpectedException(e, "setting time zone", TSStatusCode.SET_TIME_ZONE_ERROR);
@@ -1340,7 +1276,7 @@ public class TSServiceImpl implements TSIService.Iface {
     if (AUDIT_LOGGER.isDebugEnabled()) {
       AUDIT_LOGGER.debug(
           "Session {} insertRecords, first device {}, first time {}",
-          currSessionId.get(),
+          sessionManager.getCurrSessionId(),
           req.deviceIds.get(0),
           req.getTimestamps().get(0));
     }
@@ -1406,7 +1342,7 @@ public class TSServiceImpl implements TSIService.Iface {
     if (AUDIT_LOGGER.isDebugEnabled()) {
       AUDIT_LOGGER.debug(
           "Session {} insertRecords, device {}, first time {}",
-          currSessionId.get(),
+          sessionManager.getCurrSessionId(),
           req.deviceId,
           req.getTimestamps().get(0));
     }
@@ -1448,7 +1384,7 @@ public class TSServiceImpl implements TSIService.Iface {
     if (AUDIT_LOGGER.isDebugEnabled()) {
       AUDIT_LOGGER.debug(
           "Session {} insertRecords, first device {}, first time {}",
-          currSessionId.get(),
+          sessionManager.getCurrSessionId(),
           req.deviceIds.get(0),
           req.getTimestamps().get(0));
     }
@@ -1554,7 +1490,7 @@ public class TSServiceImpl implements TSIService.Iface {
 
       AUDIT_LOGGER.debug(
           "Session {} insertRecord, device {}, time {}",
-          currSessionId.get(),
+          sessionManager.getCurrSessionId(),
           req.getDeviceId(),
           req.getTimestamp());
 
@@ -1582,7 +1518,7 @@ public class TSServiceImpl implements TSIService.Iface {
 
       AUDIT_LOGGER.debug(
           "Session {} insertRecord, device {}, time {}",
-          currSessionId.get(),
+          sessionManager.getCurrSessionId(),
           req.getDeviceId(),
           req.getTimestamp());
 
@@ -1753,7 +1689,8 @@ public class TSServiceImpl implements TSIService.Iface {
       }
 
       if (AUDIT_LOGGER.isDebugEnabled()) {
-        AUDIT_LOGGER.debug("Session-{} create timeseries {}", currSessionId.get(), req.getPath());
+        AUDIT_LOGGER.debug(
+            "Session-{} create timeseries {}", sessionManager.getCurrSessionId(), req.getPath());
       }
 
       CreateTimeSeriesPlan plan =
@@ -1786,7 +1723,7 @@ public class TSServiceImpl implements TSIService.Iface {
       if (AUDIT_LOGGER.isDebugEnabled()) {
         AUDIT_LOGGER.debug(
             "Session-{} create {} timeseries, the first is {}",
-            currSessionId.get(),
+            sessionManager.getCurrSessionId(),
             req.getPaths().size(),
             req.getPaths().get(0));
       }
@@ -1882,11 +1819,7 @@ public class TSServiceImpl implements TSIService.Iface {
 
   @Override
   public long requestStatementId(long sessionId) {
-    long statementId = statementIdGenerator.incrementAndGet();
-    sessionId2StatementId
-        .computeIfAbsent(sessionId, s -> new CopyOnWriteArraySet<>())
-        .add(statementId);
-    return statementId;
+    return sessionManager.requestStatementId(sessionId);
   }
 
   @Override
@@ -1899,7 +1832,7 @@ public class TSServiceImpl implements TSIService.Iface {
       if (AUDIT_LOGGER.isDebugEnabled()) {
         AUDIT_LOGGER.debug(
             "Session-{} create device template {}.{}.{}.{}.{}.{}",
-            currSessionId.get(),
+            sessionManager.getCurrSessionId(),
             req.getName(),
             req.getSchemaNames(),
             req.getMeasurements(),
@@ -1957,7 +1890,7 @@ public class TSServiceImpl implements TSIService.Iface {
     if (AUDIT_LOGGER.isDebugEnabled()) {
       AUDIT_LOGGER.debug(
           "Session-{} set device template {}.{}",
-          currSessionId.get(),
+          sessionManager.getCurrSessionId(),
           req.getTemplateName(),
           req.getPrefixPath());
     }
@@ -1971,7 +1904,7 @@ public class TSServiceImpl implements TSIService.Iface {
   private TSStatus checkAuthority(PhysicalPlan plan, long sessionId) {
     List<PartialPath> paths = plan.getPaths();
     try {
-      if (!checkAuthorization(paths, plan, sessionIdUsernameMap.get(sessionId))) {
+      if (!checkAuthorization(paths, plan, sessionManager.getUsername(sessionId))) {
         return RpcUtils.getStatus(
             TSStatusCode.NO_PERMISSION_ERROR,
             "No permissions for this operation " + plan.getOperatorType());
@@ -2000,11 +1933,6 @@ public class TSServiceImpl implements TSIService.Iface {
         : RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR);
   }
 
-  private long generateQueryId(boolean isDataQuery, int fetchSize, int deduplicatedPathNum) {
-    return QueryResourceManager.getInstance()
-        .assignQueryId(isDataQuery, fetchSize, deduplicatedPathNum);
-  }
-
   protected List<TSDataType> getSeriesTypesByPaths(
       List<PartialPath> paths, List<String> aggregations) throws MetadataException {
     return SchemaUtils.getSeriesTypesByPaths(paths, aggregations);
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBGroupByMonthIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBGroupByMonthIT.java
index 76e0dba..6292967 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBGroupByMonthIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBGroupByMonthIT.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.jdbc.Config;
+import org.apache.iotdb.jdbc.IoTDBConnection;
 
 import org.junit.After;
 import org.junit.Assert;
@@ -81,6 +82,7 @@ public class IoTDBGroupByMonthIT {
         "02/28/2021:00:00:00", "1.0"
       };
 
+      ((IoTDBConnection) connection).setTimeZone("GMT+00:00");
       boolean hasResultSet =
           statement.execute(
               "select sum(temperature) from root.sg1.d1 "
@@ -121,6 +123,7 @@ public class IoTDBGroupByMonthIT {
         "02/28/2021:00:00:00", "1.0"
       };
 
+      ((IoTDBConnection) connection).setTimeZone("GMT+00:00");
       boolean hasResultSet =
           statement.execute(
               "select sum(temperature) from root.sg1.d1 "
@@ -153,6 +156,7 @@ public class IoTDBGroupByMonthIT {
             DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
         Statement statement = connection.createStatement()) {
 
+      ((IoTDBConnection) connection).setTimeZone("GMT+00:00");
       boolean hasResultSet =
           statement.execute(
               "select sum(temperature) from root.sg1.d1 "
@@ -186,6 +190,7 @@ public class IoTDBGroupByMonthIT {
         "02/28/2021:00:00:00", "31.0"
       };
 
+      ((IoTDBConnection) connection).setTimeZone("GMT+00:00");
       boolean hasResultSet =
           statement.execute(
               "select sum(temperature) from root.sg1.d1 GROUP BY ([1612051200000, 1617148800000), 1mo)");
@@ -231,6 +236,7 @@ public class IoTDBGroupByMonthIT {
             DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
         Statement statement = connection.createStatement()) {
 
+      ((IoTDBConnection) connection).setTimeZone("GMT+00:00");
       boolean hasResultSet =
           statement.execute(
               "select sum(temperature) from root.sg1.d1 GROUP BY ([now() - 1mo, now()), 1d)");
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/GroupByMonthFilter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/GroupByMonthFilter.java
index df64437..213d520 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/GroupByMonthFilter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/GroupByMonthFilter.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
 import java.util.Calendar;
 import java.util.Objects;
+import java.util.TimeZone;
 
 /**
  * GroupByMonthFilter is used to handle natural month slidingStep and interval by generating
@@ -44,9 +45,11 @@ public class GroupByMonthFilter extends GroupByFilter {
       long startTime,
       long endTime,
       boolean isSlidingStepByMonth,
-      boolean isIntervalByMonth) {
+      boolean isIntervalByMonth,
+      TimeZone timeZone) {
     super(interval, slidingStep, startTime, endTime);
     initialStartTime = startTime;
+    calendar.setTimeZone(timeZone);
     calendar.setTimeInMillis(startTime);
     this.isIntervalByMonth = isIntervalByMonth;
     this.isSlidingStepByMonth = isSlidingStepByMonth;
@@ -68,6 +71,7 @@ public class GroupByMonthFilter extends GroupByFilter {
     slidingStepsInMo = filter.slidingStepsInMo;
     initialStartTime = filter.initialStartTime;
     calendar = Calendar.getInstance();
+    calendar.setTimeZone(filter.calendar.getTimeZone());
     calendar.setTimeInMillis(filter.calendar.getTimeInMillis());
   }
 
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/read/filter/GroupByMonthFilterTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/read/filter/GroupByMonthFilterTest.java
index d816090..a374c74 100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/read/filter/GroupByMonthFilterTest.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/read/filter/GroupByMonthFilterTest.java
@@ -24,6 +24,8 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
 import org.junit.Test;
 
+import java.util.TimeZone;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
@@ -41,7 +43,8 @@ public class GroupByMonthFilterTest {
   @Test
   public void TestSatisfy1() {
     GroupByMonthFilter filter =
-        new GroupByMonthFilter(MS_TO_MONTH, 2 * MS_TO_MONTH, 0, END_TIME, true, true);
+        new GroupByMonthFilter(
+            MS_TO_MONTH, 2 * MS_TO_MONTH, 0, END_TIME, true, true, TimeZone.getTimeZone("+08:00"));
 
     // 1970-01-01 08:00:00, timezone = GMT+08:00
     assertTrue(filter.satisfy(0, null));
@@ -75,7 +78,8 @@ public class GroupByMonthFilterTest {
   @Test
   public void TestSatisfy2() {
     GroupByMonthFilter filter =
-        new GroupByMonthFilter(MS_TO_MONTH, MS_TO_MONTH, 0, END_TIME, true, true);
+        new GroupByMonthFilter(
+            MS_TO_MONTH, MS_TO_MONTH, 0, END_TIME, true, true, TimeZone.getTimeZone("+08:00"));
 
     // 1970-01-01 08:00:00, timezone = GMT+08:00
     assertTrue(filter.satisfy(0, null));
@@ -106,7 +110,8 @@ public class GroupByMonthFilterTest {
   @Test
   public void TestSatisfy3() {
     GroupByMonthFilter filter =
-        new GroupByMonthFilter(MS_TO_DAY, MS_TO_MONTH, 0, END_TIME, true, false);
+        new GroupByMonthFilter(
+            MS_TO_DAY, MS_TO_MONTH, 0, END_TIME, true, false, TimeZone.getTimeZone("+08:00"));
 
     // 1970-01-01 08:00:00, timezone = GMT+08:00
     assertTrue(filter.satisfy(0, null));
@@ -137,7 +142,8 @@ public class GroupByMonthFilterTest {
   @Test
   public void TestSatisfy4() {
     GroupByMonthFilter filter =
-        new GroupByMonthFilter(MS_TO_MONTH, MS_TO_DAY * 100, 0, END_TIME, false, true);
+        new GroupByMonthFilter(
+            MS_TO_MONTH, MS_TO_DAY * 100, 0, END_TIME, false, true, TimeZone.getTimeZone("+08:00"));
 
     // 1970-01-01 08:00:00, timezone = GMT+08:00
     assertTrue(filter.satisfy(0, null));
@@ -156,7 +162,8 @@ public class GroupByMonthFilterTest {
   @Test
   public void TestSatisfyStartEndTime() {
     GroupByMonthFilter filter =
-        new GroupByMonthFilter(MS_TO_DAY, MS_TO_MONTH, 0, END_TIME, true, false);
+        new GroupByMonthFilter(
+            MS_TO_DAY, MS_TO_MONTH, 0, END_TIME, true, false, TimeZone.getTimeZone("+08:00"));
 
     // 1970-01-01 08:00:00 - 1970-01-02 08:00:00, timezone = GMT+08:00
     Statistics statistics = new LongStatistics();
@@ -202,7 +209,8 @@ public class GroupByMonthFilterTest {
   @Test
   public void TestContainStartEndTime() {
     GroupByMonthFilter filter =
-        new GroupByMonthFilter(MS_TO_DAY, MS_TO_MONTH, 0, END_TIME, true, false);
+        new GroupByMonthFilter(
+            MS_TO_DAY, MS_TO_MONTH, 0, END_TIME, true, false, TimeZone.getTimeZone("+08:00"));
 
     // 1970-01-01 08:00:00 - 1970-01-02 08:00:00, timezone = GMT+08:00
     assertFalse(filter.containStartEndTime(0, MS_TO_DAY));
@@ -238,11 +246,13 @@ public class GroupByMonthFilterTest {
   @Test
   public void TestEquals() {
     GroupByMonthFilter filter =
-        new GroupByMonthFilter(MS_TO_DAY, MS_TO_MONTH, 0, END_TIME, true, false);
+        new GroupByMonthFilter(
+            MS_TO_DAY, MS_TO_MONTH, 0, END_TIME, true, false, TimeZone.getTimeZone("+08:00"));
     Filter filter2 = filter.copy();
     assertEquals(filter, filter2);
     GroupByMonthFilter filter3 =
-        new GroupByMonthFilter(MS_TO_MONTH, MS_TO_MONTH, 0, END_TIME, true, true);
+        new GroupByMonthFilter(
+            MS_TO_MONTH, MS_TO_MONTH, 0, END_TIME, true, true, TimeZone.getTimeZone("+08:00"));
     assertNotEquals(filter, filter3);
   }
 }