You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2021/07/15 02:48:40 UTC
[iotdb] branch rel/0.12 updated: [To rel/0.12] [ISSUE-3545] Bug:
Time interval value is disorder in group by month (#3566)
This is an automated email from the ASF dual-hosted git repository.
xiangweiwei pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.12 by this push:
new eb8220e [To rel/0.12] [ISSUE-3545] Bug: Time interval value is disorder in group by month (#3566)
eb8220e is described below
commit eb8220ef69fc6f38a8725b3257aedc9d00bf8bc5
Author: Xiangwei Wei <34...@users.noreply.github.com>
AuthorDate: Thu Jul 15 10:48:06 2021 +0800
[To rel/0.12] [ISSUE-3545] Bug: Time interval value is disorder in group by month (#3566)
---
.../apache/iotdb/db/qp/utils/DatetimeUtils.java | 2 +
.../iotdb/db/query/control/SessionManager.java | 199 +++++++++++++++++++++
.../dataset/groupby/GroupByEngineDataSet.java | 2 +
.../iotdb/db/query/executor/QueryRouter.java | 4 +-
.../org/apache/iotdb/db/service/TSServiceImpl.java | 170 +++++-------------
.../iotdb/db/integration/IoTDBGroupByMonthIT.java | 6 +
.../tsfile/read/filter/GroupByMonthFilter.java | 6 +-
.../tsfile/read/filter/GroupByMonthFilterTest.java | 26 ++-
8 files changed, 284 insertions(+), 131 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/utils/DatetimeUtils.java b/server/src/main/java/org/apache/iotdb/db/qp/utils/DatetimeUtils.java
index 64698ef..1f5a425 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/utils/DatetimeUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/utils/DatetimeUtils.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.qp.utils;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.query.LogicalOperatorException;
+import org.apache.iotdb.db.query.control.SessionManager;
import java.time.DateTimeException;
import java.time.Instant;
@@ -570,6 +571,7 @@ public class DatetimeUtils {
res *= 30 * 86_400_000L;
} else {
Calendar calendar = Calendar.getInstance();
+ calendar.setTimeZone(SessionManager.getInstance().getCurrSessionTimeZone());
calendar.setTimeInMillis(currentTime);
calendar.add(Calendar.MONTH, (int) (value));
res = calendar.getTimeInMillis() - currentTime;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
new file mode 100644
index 0000000..91b37da
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.query.control;
+
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.query.dataset.UDTFDataSet;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.ZoneId;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.TimeZone;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class SessionManager {
+ private static final Logger LOGGER = LoggerFactory.getLogger(SessionManager.class);
+
+ // When the client abnormally exits, we can still know who to disconnect
+ private final ThreadLocal<Long> currSessionId = new ThreadLocal<>();
+ // Record the username for every rpc connection (session).
+ private final Map<Long, String> sessionIdToUsername = new ConcurrentHashMap<>();
+ private final Map<Long, ZoneId> sessionIdToZoneId = new ConcurrentHashMap<>();
+
+ // The sessionId is unique in one IoTDB instance.
+ private final AtomicLong sessionIdGenerator = new AtomicLong();
+ // The statementId is unique in one IoTDB instance.
+ private final AtomicLong statementIdGenerator = new AtomicLong();
+
+ // (sessionId -> Set(statementId))
+ private final Map<Long, Set<Long>> sessionIdToStatementId = new ConcurrentHashMap<>();
+ // (statementId -> Set(queryId))
+ private final Map<Long, Set<Long>> statementIdToQueryId = new ConcurrentHashMap<>();
+ // (queryId -> QueryDataSet)
+ private final Map<Long, QueryDataSet> queryIdToDataSet = new ConcurrentHashMap<>();
+
+ private SessionManager() {
+ // singleton
+ }
+
+ public Long getCurrSessionId() {
+ return currSessionId.get();
+ }
+
+ public void removeCurrSessionId() {
+ currSessionId.remove();
+ }
+
+ public TimeZone getCurrSessionTimeZone() {
+ if (getCurrSessionId() != null) {
+ return TimeZone.getTimeZone(SessionManager.getInstance().getZoneId(getCurrSessionId()));
+ } else {
+ // only used for test
+ return TimeZone.getTimeZone("+08:00");
+ }
+ }
+
+ public long requestSessionId(String username, String zoneId) {
+ long sessionId = sessionIdGenerator.incrementAndGet();
+ currSessionId.set(sessionId);
+ sessionIdToUsername.put(sessionId, username);
+ sessionIdToZoneId.put(sessionId, ZoneId.of(zoneId));
+
+ return sessionId;
+ }
+
+ public boolean releaseSessionResource(long sessionId) {
+ sessionIdToZoneId.remove(sessionId);
+
+ for (long statementId :
+ sessionIdToStatementId.getOrDefault(sessionId, Collections.emptySet())) {
+ for (long queryId : statementIdToQueryId.getOrDefault(statementId, Collections.emptySet())) {
+ releaseQueryResourceNoExceptions(queryId);
+ }
+ }
+
+ return sessionIdToUsername.remove(sessionId) != null;
+ }
+
+ public long requestStatementId(long sessionId) {
+ long statementId = statementIdGenerator.incrementAndGet();
+ sessionIdToStatementId
+ .computeIfAbsent(sessionId, s -> new CopyOnWriteArraySet<>())
+ .add(statementId);
+ return statementId;
+ }
+
+ public void closeStatement(long sessionId, long statementId) {
+ Set<Long> queryIdSet = statementIdToQueryId.remove(statementId);
+ if (queryIdSet != null) {
+ for (long queryId : queryIdSet) {
+ releaseQueryResourceNoExceptions(queryId);
+ }
+ }
+
+ if (sessionIdToStatementId.containsKey(sessionId)) {
+ sessionIdToStatementId.get(sessionId).remove(statementId);
+ }
+ }
+
+ public long requestQueryId(
+ Long statementId, boolean isDataQuery, int fetchSize, int deduplicatedPathNum) {
+ long queryId = requestQueryId(isDataQuery, fetchSize, deduplicatedPathNum);
+ statementIdToQueryId
+ .computeIfAbsent(statementId, k -> new CopyOnWriteArraySet<>())
+ .add(queryId);
+ return queryId;
+ }
+
+ public long requestQueryId(boolean isDataQuery, int fetchSize, int deduplicatedPathNum) {
+ return QueryResourceManager.getInstance()
+ .assignQueryId(isDataQuery, fetchSize, deduplicatedPathNum);
+ }
+
+ public void releaseQueryResource(long queryId) throws StorageEngineException {
+ QueryDataSet dataSet = queryIdToDataSet.remove(queryId);
+ if (dataSet instanceof UDTFDataSet) {
+ ((UDTFDataSet) dataSet).finalizeUDFs(queryId);
+ }
+ QueryResourceManager.getInstance().endQuery(queryId);
+ }
+
+ public void releaseQueryResourceNoExceptions(long queryId) {
+ if (queryId != -1) {
+ try {
+ releaseQueryResource(queryId);
+ } catch (Exception e) {
+ LOGGER.warn("Error occurred while releasing query resource: ", e);
+ }
+ }
+ }
+
+ public String getUsername(Long sessionId) {
+ return sessionIdToUsername.get(sessionId);
+ }
+
+ public ZoneId getZoneId(Long sessionId) {
+ return sessionIdToZoneId.get(sessionId);
+ }
+
+ public void setTimezone(Long sessionId, String zone) {
+ sessionIdToZoneId.put(sessionId, ZoneId.of(zone));
+ }
+
+ public boolean hasDataset(Long queryId) {
+ return queryIdToDataSet.containsKey(queryId);
+ }
+
+ public QueryDataSet getDataset(Long queryId) {
+ return queryIdToDataSet.get(queryId);
+ }
+
+ public void setDataset(Long queryId, QueryDataSet dataSet) {
+ queryIdToDataSet.put(queryId, dataSet);
+ }
+
+ public void removeDataset(Long queryId) {
+ queryIdToDataSet.remove(queryId);
+ }
+
+ public void closeDataset(Long statementId, Long queryId) {
+ releaseQueryResourceNoExceptions(queryId);
+ if (statementIdToQueryId.containsKey(statementId)) {
+ statementIdToQueryId.get(statementId).remove(queryId);
+ }
+ }
+
+ public static SessionManager getInstance() {
+ return SessionManagerHelper.INSTANCE;
+ }
+
+ private static class SessionManagerHelper {
+
+ private static final SessionManager INSTANCE = new SessionManager();
+
+ private SessionManagerHelper() {}
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByEngineDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByEngineDataSet.java
index 4127b1a..7194633 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByEngineDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByEngineDataSet.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.query.dataset.groupby;
import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan;
import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.SessionManager;
import org.apache.iotdb.db.utils.TestOnly;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.common.RowRecord;
@@ -159,6 +160,7 @@ public abstract class GroupByEngineDataSet extends QueryDataSet {
*/
public long calcIntervalByMonth(long numMonths) {
Calendar calendar = Calendar.getInstance();
+ calendar.setTimeZone(SessionManager.getInstance().getCurrSessionTimeZone());
calendar.setTimeInMillis(startTime);
calendar.add(Calendar.MONTH, (int) (numMonths));
return calendar.getTimeInMillis();
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java b/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java
index d7aa307..77bfd59 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.db.qp.physical.crud.LastQueryPlan;
import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.SessionManager;
import org.apache.iotdb.db.query.dataset.groupby.GroupByEngineDataSet;
import org.apache.iotdb.db.query.dataset.groupby.GroupByFillDataSet;
import org.apache.iotdb.db.query.dataset.groupby.GroupByTimeDataSet;
@@ -198,7 +199,8 @@ public class QueryRouter implements IQueryRouter {
plan.getStartTime(),
plan.getEndTime(),
plan.isSlidingStepByMonth(),
- plan.isIntervalByMonth())));
+ plan.isIntervalByMonth(),
+ SessionManager.getInstance().getCurrSessionTimeZone())));
} else {
return new GlobalTimeExpression(
new GroupByFilter(
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 67f0467..537250c 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -72,13 +72,12 @@ import org.apache.iotdb.db.qp.physical.sys.ShowPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowQueryProcesslistPlan;
import org.apache.iotdb.db.query.aggregation.AggregateResult;
import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.query.control.QueryTimeManager;
+import org.apache.iotdb.db.query.control.SessionManager;
import org.apache.iotdb.db.query.control.TracingManager;
import org.apache.iotdb.db.query.dataset.AlignByDeviceDataSet;
import org.apache.iotdb.db.query.dataset.DirectAlignByTimeDataSet;
import org.apache.iotdb.db.query.dataset.DirectNonAlignDataSet;
-import org.apache.iotdb.db.query.dataset.UDTFDataSet;
import org.apache.iotdb.db.tools.watermark.GroupedLSBWatermarkEncoder;
import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
import org.apache.iotdb.db.utils.QueryDataSetUtils;
@@ -141,19 +140,15 @@ import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
-import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
/** Thrift RPC implementation at server side. */
@@ -191,32 +186,14 @@ public class TSServiceImpl implements TSIService.Iface {
protected Planner processor;
protected IPlanExecutor executor;
- // Record the username for every rpc connection (session).
- private final Map<Long, String> sessionIdUsernameMap = new ConcurrentHashMap<>();
- private final Map<Long, ZoneId> sessionIdZoneIdMap = new ConcurrentHashMap<>();
-
- // The sessionId is unique in one IoTDB instance.
- private final AtomicLong sessionIdGenerator = new AtomicLong();
- // The statementId is unique in one IoTDB instance.
- private final AtomicLong statementIdGenerator = new AtomicLong();
-
- // (sessionId -> Set(statementId))
- private final Map<Long, Set<Long>> sessionId2StatementId = new ConcurrentHashMap<>();
- // (statementId -> Set(queryId))
- private final Map<Long, Set<Long>> statementId2QueryId = new ConcurrentHashMap<>();
- // (queryId -> QueryDataSet)
- private final Map<Long, QueryDataSet> queryId2DataSet = new ConcurrentHashMap<>();
-
- // When the client abnormally exits, we can still know who to disconnect
- private final ThreadLocal<Long> currSessionId = new ThreadLocal<>();
+ private final SessionManager sessionManager = SessionManager.getInstance();
+ private final QueryTimeManager queryTimeManager = QueryTimeManager.getInstance();
public static final TSProtocolVersion CURRENT_RPC_VERSION =
TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3;
private static final AtomicInteger queryCount = new AtomicInteger(0);
- private QueryTimeManager queryTimeManager = QueryTimeManager.getInstance();
-
public TSServiceImpl() throws QueryProcessException {
processor = new Planner();
executor = new PlanExecutor();
@@ -273,10 +250,8 @@ public class TSServiceImpl implements TSIService.Iface {
}
tsStatus = RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Login successfully");
- sessionId = sessionIdGenerator.incrementAndGet();
- sessionIdUsernameMap.put(sessionId, req.getUsername());
- sessionIdZoneIdMap.put(sessionId, ZoneId.of(req.getZoneId()));
- currSessionId.set(sessionId);
+
+ sessionId = sessionManager.requestSessionId(req.getUsername(), req.getZoneId());
AUDIT_LOGGER.info("User {} opens Session-{}", req.getUsername(), sessionId);
LOGGER.info(
"{}: Login status: {}. User : {}",
@@ -305,17 +280,10 @@ public class TSServiceImpl implements TSIService.Iface {
long sessionId = req.getSessionId();
AUDIT_LOGGER.info("Session-{} is closing", sessionId);
- currSessionId.remove();
- sessionIdZoneIdMap.remove(sessionId);
-
- for (long statementId : sessionId2StatementId.getOrDefault(sessionId, Collections.emptySet())) {
- for (long queryId : statementId2QueryId.getOrDefault(statementId, Collections.emptySet())) {
- releaseQueryResourceNoExceptions(queryId);
- }
- }
+ sessionManager.removeCurrSessionId();
return new TSStatus(
- sessionIdUsernameMap.remove(sessionId) == null
+ !sessionManager.releaseSessionResource(sessionId)
? RpcUtils.getStatus(TSStatusCode.NOT_LOGIN_ERROR)
: RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
}
@@ -336,29 +304,14 @@ public class TSServiceImpl implements TSIService.Iface {
AUDIT_LOGGER.debug(
"{}: receive close operation from Session {}",
IoTDBConstant.GLOBAL_DB_NAME,
- currSessionId.get());
+ sessionManager.getCurrSessionId());
}
try {
- // ResultSet close
if (req.isSetStatementId() && req.isSetQueryId()) {
- releaseQueryResourceNoExceptions(req.queryId);
- // clear the statementId2QueryId map
- if (statementId2QueryId.containsKey(req.getStatementId())) {
- statementId2QueryId.get(req.getStatementId()).remove(req.getQueryId());
- }
+ sessionManager.closeDataset(req.statementId, req.queryId);
} else {
- // statement close
- Set<Long> queryIdSet = statementId2QueryId.remove(req.getStatementId());
- if (queryIdSet != null) {
- for (long queryId : queryIdSet) {
- releaseQueryResourceNoExceptions(queryId);
- }
- }
- // clear the sessionId2StatementId map
- if (sessionId2StatementId.containsKey(req.getSessionId())) {
- sessionId2StatementId.get(req.getSessionId()).remove(req.getStatementId());
- }
+ sessionManager.closeStatement(req.sessionId, req.statementId);
}
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
} catch (Exception e) {
@@ -369,22 +322,7 @@ public class TSServiceImpl implements TSIService.Iface {
/** release single operation resource */
protected void releaseQueryResource(long queryId) throws StorageEngineException {
- // remove the corresponding Physical Plan
- QueryDataSet dataSet = queryId2DataSet.remove(queryId);
- if (dataSet instanceof UDTFDataSet) {
- ((UDTFDataSet) dataSet).finalizeUDFs(queryId);
- }
- QueryResourceManager.getInstance().endQuery(queryId);
- }
-
- private void releaseQueryResourceNoExceptions(long queryId) {
- if (queryId != -1) {
- try {
- releaseQueryResource(queryId);
- } catch (Exception e) {
- LOGGER.warn("Error occurred while releasing query resource: ", e);
- }
- }
+ sessionManager.releaseQueryResource(queryId);
}
@Override
@@ -551,7 +489,7 @@ public class TSServiceImpl implements TSIService.Iface {
try {
PhysicalPlan physicalPlan =
processor.parseSQLToPhysicalPlan(
- statement, sessionIdZoneIdMap.get(req.getSessionId()), DEFAULT_FETCH_SIZE);
+ statement, sessionManager.getZoneId(req.sessionId), DEFAULT_FETCH_SIZE);
if (physicalPlan.isQuery()) {
throw new QueryInBatchStatementException(statement);
}
@@ -649,7 +587,7 @@ public class TSServiceImpl implements TSIService.Iface {
String statement = req.getStatement();
PhysicalPlan physicalPlan =
processor.parseSQLToPhysicalPlan(
- statement, sessionIdZoneIdMap.get(req.getSessionId()), req.fetchSize);
+ statement, sessionManager.getZoneId(req.getSessionId()), req.fetchSize);
return physicalPlan.isQuery()
? internalExecuteQueryStatement(
@@ -658,7 +596,7 @@ public class TSServiceImpl implements TSIService.Iface {
physicalPlan,
req.fetchSize,
req.timeout,
- sessionIdUsernameMap.get(req.getSessionId()),
+ sessionManager.getUsername(req.getSessionId()),
req.isEnableRedirectQuery())
: executeUpdateStatement(physicalPlan, req.getSessionId());
} catch (InterruptedException e) {
@@ -680,7 +618,7 @@ public class TSServiceImpl implements TSIService.Iface {
String statement = req.getStatement();
PhysicalPlan physicalPlan =
processor.parseSQLToPhysicalPlan(
- statement, sessionIdZoneIdMap.get(req.getSessionId()), req.fetchSize);
+ statement, sessionManager.getZoneId(req.sessionId), req.fetchSize);
return physicalPlan.isQuery()
? internalExecuteQueryStatement(
@@ -689,7 +627,7 @@ public class TSServiceImpl implements TSIService.Iface {
physicalPlan,
req.fetchSize,
req.timeout,
- sessionIdUsernameMap.get(req.getSessionId()),
+ sessionManager.getUsername(req.getSessionId()),
req.isEnableRedirectQuery())
: RpcUtils.getTSExecuteStatementResp(
TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is not a query statement.");
@@ -712,7 +650,7 @@ public class TSServiceImpl implements TSIService.Iface {
}
PhysicalPlan physicalPlan =
- processor.rawDataQueryReqToPhysicalPlan(req, sessionIdZoneIdMap.get(req.getSessionId()));
+ processor.rawDataQueryReqToPhysicalPlan(req, sessionManager.getZoneId(req.sessionId));
return physicalPlan.isQuery()
? internalExecuteQueryStatement(
"",
@@ -720,7 +658,7 @@ public class TSServiceImpl implements TSIService.Iface {
physicalPlan,
req.fetchSize,
config.getQueryTimeoutThreshold(),
- sessionIdUsernameMap.get(req.getSessionId()),
+ sessionManager.getUsername(req.sessionId),
req.isEnableRedirectQuery())
: RpcUtils.getTSExecuteStatementResp(
TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is not a query statement.");
@@ -752,7 +690,8 @@ public class TSServiceImpl implements TSIService.Iface {
QueryFilterOptimizationException, MetadataException, IOException, InterruptedException,
TException, AuthException {
queryCount.incrementAndGet();
- AUDIT_LOGGER.debug("Session {} execute Query: {}", currSessionId.get(), statement);
+ AUDIT_LOGGER.debug(
+ "Session {} execute Query: {}", sessionManager.getCurrSessionId(), statement);
long startTime = System.currentTimeMillis();
long queryId = -1;
try {
@@ -762,7 +701,7 @@ public class TSServiceImpl implements TSIService.Iface {
fetchSize = p.left;
// generate the queryId for the operation
- queryId = generateQueryId(true, fetchSize, p.right);
+ queryId = sessionManager.requestQueryId(statementId, true, fetchSize, p.right);
// register query info to queryTimeManager
if (!(plan instanceof ShowQueryProcesslistPlan)) {
queryTimeManager.registerQuery(queryId, startTime, statement, timeout);
@@ -776,10 +715,6 @@ public class TSServiceImpl implements TSIService.Iface {
}
}
- statementId2QueryId
- .computeIfAbsent(statementId, k -> new CopyOnWriteArraySet<>())
- .add(queryId);
-
if (plan instanceof AuthorPlan) {
plan.setLoginUserName(username);
}
@@ -870,7 +805,7 @@ public class TSServiceImpl implements TSIService.Iface {
}
return resp;
} catch (Exception e) {
- releaseQueryResourceNoExceptions(queryId);
+ sessionManager.releaseQueryResourceNoExceptions(queryId);
throw e;
} finally {
Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_QUERY, startTime);
@@ -1105,7 +1040,7 @@ public class TSServiceImpl implements TSIService.Iface {
return RpcUtils.getTSFetchResultsResp(TSStatusCode.NOT_LOGIN_ERROR);
}
- if (!queryId2DataSet.containsKey(req.queryId)) {
+ if (!sessionManager.hasDataset(req.queryId)) {
return RpcUtils.getTSFetchResultsResp(
RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, "Has not executed query"));
}
@@ -1114,13 +1049,14 @@ public class TSServiceImpl implements TSIService.Iface {
queryTimeManager.registerQuery(
req.queryId, System.currentTimeMillis(), req.statement, req.timeout);
- QueryDataSet queryDataSet = queryId2DataSet.get(req.queryId);
+ QueryDataSet queryDataSet = sessionManager.getDataset(req.queryId);
if (req.isAlign) {
TSQueryDataSet result =
- fillRpcReturnData(req.fetchSize, queryDataSet, sessionIdUsernameMap.get(req.sessionId));
+ fillRpcReturnData(
+ req.fetchSize, queryDataSet, sessionManager.getUsername(req.sessionId));
boolean hasResultSet = result.bufferForTime().limit() != 0;
if (!hasResultSet) {
- releaseQueryResourceNoExceptions(req.queryId);
+ sessionManager.releaseQueryResourceNoExceptions(req.queryId);
}
TSFetchResultsResp resp = RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS);
resp.setHasResultSet(hasResultSet);
@@ -1132,7 +1068,7 @@ public class TSServiceImpl implements TSIService.Iface {
} else {
TSQueryNonAlignDataSet nonAlignResult =
fillRpcNonAlignReturnData(
- req.fetchSize, queryDataSet, sessionIdUsernameMap.get(req.sessionId));
+ req.fetchSize, queryDataSet, sessionManager.getUsername(req.sessionId));
boolean hasResultSet = false;
for (ByteBuffer timeBuffer : nonAlignResult.getTimeList()) {
if (timeBuffer.limit() != 0) {
@@ -1141,7 +1077,7 @@ public class TSServiceImpl implements TSIService.Iface {
}
}
if (!hasResultSet) {
- queryId2DataSet.remove(req.queryId);
+ sessionManager.removeDataset(req.queryId);
}
TSFetchResultsResp resp = RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS);
resp.setHasResultSet(hasResultSet);
@@ -1158,7 +1094,7 @@ public class TSServiceImpl implements TSIService.Iface {
onNPEOrUnexpectedException(
e, "executing fetchResults", TSStatusCode.INTERNAL_SERVER_ERROR));
} catch (Exception e) {
- releaseQueryResourceNoExceptions(req.queryId);
+ sessionManager.releaseQueryResourceNoExceptions(req.queryId);
return RpcUtils.getTSFetchResultsResp(
onNPEOrUnexpectedException(
e, "executing fetchResults", TSStatusCode.INTERNAL_SERVER_ERROR));
@@ -1210,7 +1146,7 @@ public class TSServiceImpl implements TSIService.Iface {
QueryContext context = genQueryContext(queryId, physicalPlan.isDebug());
QueryDataSet queryDataSet = executor.processQuery(physicalPlan, context);
queryDataSet.setFetchSize(fetchSize);
- queryId2DataSet.put(queryId, queryDataSet);
+ sessionManager.setDataset(queryId, queryDataSet);
return queryDataSet;
}
@@ -1239,7 +1175,7 @@ public class TSServiceImpl implements TSIService.Iface {
status = executeNonQueryPlan(plan);
TSExecuteStatementResp resp = RpcUtils.getTSExecuteStatementResp(status);
- long queryId = generateQueryId(false, DEFAULT_FETCH_SIZE, -1);
+ long queryId = sessionManager.requestQueryId(false, DEFAULT_FETCH_SIZE, -1);
return resp.setQueryId(queryId);
}
@@ -1256,7 +1192,7 @@ public class TSServiceImpl implements TSIService.Iface {
throws QueryProcessException {
PhysicalPlan physicalPlan =
processor.parseSQLToPhysicalPlan(
- statement, sessionIdZoneIdMap.get(sessionId), DEFAULT_FETCH_SIZE);
+ statement, sessionManager.getZoneId(sessionId), DEFAULT_FETCH_SIZE);
return physicalPlan.isQuery()
? RpcUtils.getTSExecuteStatementResp(
TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is a query statement.")
@@ -1269,7 +1205,7 @@ public class TSServiceImpl implements TSIService.Iface {
* @return true: If logged in; false: If not logged in
*/
private boolean checkLogin(long sessionId) {
- boolean isLoggedIn = sessionIdUsernameMap.get(sessionId) != null;
+ boolean isLoggedIn = sessionManager.getUsername(sessionId) != null;
if (!isLoggedIn) {
LOGGER.info(INFO_NOT_LOGIN, IoTDBConstant.GLOBAL_DB_NAME);
}
@@ -1286,7 +1222,7 @@ public class TSServiceImpl implements TSIService.Iface {
}
protected void handleClientExit() {
- Long sessionId = currSessionId.get();
+ Long sessionId = sessionManager.getCurrSessionId();
if (sessionId != null) {
TSCloseSessionReq req = new TSCloseSessionReq(sessionId);
closeSession(req);
@@ -1296,7 +1232,7 @@ public class TSServiceImpl implements TSIService.Iface {
@Override
public TSGetTimeZoneResp getTimeZone(long sessionId) {
try {
- ZoneId zoneId = sessionIdZoneIdMap.get(sessionId);
+ ZoneId zoneId = sessionManager.getZoneId(sessionId);
return new TSGetTimeZoneResp(
RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS),
zoneId != null ? zoneId.toString() : "Unknown time zone");
@@ -1311,7 +1247,7 @@ public class TSServiceImpl implements TSIService.Iface {
@Override
public TSStatus setTimeZone(TSSetTimeZoneReq req) {
try {
- sessionIdZoneIdMap.put(req.getSessionId(), ZoneId.of(req.getTimeZone()));
+ sessionManager.setTimezone(req.sessionId, req.timeZone);
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
} catch (Exception e) {
return onNPEOrUnexpectedException(e, "setting time zone", TSStatusCode.SET_TIME_ZONE_ERROR);
@@ -1340,7 +1276,7 @@ public class TSServiceImpl implements TSIService.Iface {
if (AUDIT_LOGGER.isDebugEnabled()) {
AUDIT_LOGGER.debug(
"Session {} insertRecords, first device {}, first time {}",
- currSessionId.get(),
+ sessionManager.getCurrSessionId(),
req.deviceIds.get(0),
req.getTimestamps().get(0));
}
@@ -1406,7 +1342,7 @@ public class TSServiceImpl implements TSIService.Iface {
if (AUDIT_LOGGER.isDebugEnabled()) {
AUDIT_LOGGER.debug(
"Session {} insertRecords, device {}, first time {}",
- currSessionId.get(),
+ sessionManager.getCurrSessionId(),
req.deviceId,
req.getTimestamps().get(0));
}
@@ -1448,7 +1384,7 @@ public class TSServiceImpl implements TSIService.Iface {
if (AUDIT_LOGGER.isDebugEnabled()) {
AUDIT_LOGGER.debug(
"Session {} insertRecords, first device {}, first time {}",
- currSessionId.get(),
+ sessionManager.getCurrSessionId(),
req.deviceIds.get(0),
req.getTimestamps().get(0));
}
@@ -1554,7 +1490,7 @@ public class TSServiceImpl implements TSIService.Iface {
AUDIT_LOGGER.debug(
"Session {} insertRecord, device {}, time {}",
- currSessionId.get(),
+ sessionManager.getCurrSessionId(),
req.getDeviceId(),
req.getTimestamp());
@@ -1582,7 +1518,7 @@ public class TSServiceImpl implements TSIService.Iface {
AUDIT_LOGGER.debug(
"Session {} insertRecord, device {}, time {}",
- currSessionId.get(),
+ sessionManager.getCurrSessionId(),
req.getDeviceId(),
req.getTimestamp());
@@ -1753,7 +1689,8 @@ public class TSServiceImpl implements TSIService.Iface {
}
if (AUDIT_LOGGER.isDebugEnabled()) {
- AUDIT_LOGGER.debug("Session-{} create timeseries {}", currSessionId.get(), req.getPath());
+ AUDIT_LOGGER.debug(
+ "Session-{} create timeseries {}", sessionManager.getCurrSessionId(), req.getPath());
}
CreateTimeSeriesPlan plan =
@@ -1786,7 +1723,7 @@ public class TSServiceImpl implements TSIService.Iface {
if (AUDIT_LOGGER.isDebugEnabled()) {
AUDIT_LOGGER.debug(
"Session-{} create {} timeseries, the first is {}",
- currSessionId.get(),
+ sessionManager.getCurrSessionId(),
req.getPaths().size(),
req.getPaths().get(0));
}
@@ -1882,11 +1819,7 @@ public class TSServiceImpl implements TSIService.Iface {
@Override
public long requestStatementId(long sessionId) {
- long statementId = statementIdGenerator.incrementAndGet();
- sessionId2StatementId
- .computeIfAbsent(sessionId, s -> new CopyOnWriteArraySet<>())
- .add(statementId);
- return statementId;
+ return sessionManager.requestStatementId(sessionId);
}
@Override
@@ -1899,7 +1832,7 @@ public class TSServiceImpl implements TSIService.Iface {
if (AUDIT_LOGGER.isDebugEnabled()) {
AUDIT_LOGGER.debug(
"Session-{} create device template {}.{}.{}.{}.{}.{}",
- currSessionId.get(),
+ sessionManager.getCurrSessionId(),
req.getName(),
req.getSchemaNames(),
req.getMeasurements(),
@@ -1957,7 +1890,7 @@ public class TSServiceImpl implements TSIService.Iface {
if (AUDIT_LOGGER.isDebugEnabled()) {
AUDIT_LOGGER.debug(
"Session-{} set device template {}.{}",
- currSessionId.get(),
+ sessionManager.getCurrSessionId(),
req.getTemplateName(),
req.getPrefixPath());
}
@@ -1971,7 +1904,7 @@ public class TSServiceImpl implements TSIService.Iface {
private TSStatus checkAuthority(PhysicalPlan plan, long sessionId) {
List<PartialPath> paths = plan.getPaths();
try {
- if (!checkAuthorization(paths, plan, sessionIdUsernameMap.get(sessionId))) {
+ if (!checkAuthorization(paths, plan, sessionManager.getUsername(sessionId))) {
return RpcUtils.getStatus(
TSStatusCode.NO_PERMISSION_ERROR,
"No permissions for this operation " + plan.getOperatorType());
@@ -2000,11 +1933,6 @@ public class TSServiceImpl implements TSIService.Iface {
: RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR);
}
- private long generateQueryId(boolean isDataQuery, int fetchSize, int deduplicatedPathNum) {
- return QueryResourceManager.getInstance()
- .assignQueryId(isDataQuery, fetchSize, deduplicatedPathNum);
- }
-
protected List<TSDataType> getSeriesTypesByPaths(
List<PartialPath> paths, List<String> aggregations) throws MetadataException {
return SchemaUtils.getSeriesTypesByPaths(paths, aggregations);
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBGroupByMonthIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBGroupByMonthIT.java
index 76e0dba..6292967 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBGroupByMonthIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBGroupByMonthIT.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.jdbc.Config;
+import org.apache.iotdb.jdbc.IoTDBConnection;
import org.junit.After;
import org.junit.Assert;
@@ -81,6 +82,7 @@ public class IoTDBGroupByMonthIT {
"02/28/2021:00:00:00", "1.0"
};
+ ((IoTDBConnection) connection).setTimeZone("GMT+00:00");
boolean hasResultSet =
statement.execute(
"select sum(temperature) from root.sg1.d1 "
@@ -121,6 +123,7 @@ public class IoTDBGroupByMonthIT {
"02/28/2021:00:00:00", "1.0"
};
+ ((IoTDBConnection) connection).setTimeZone("GMT+00:00");
boolean hasResultSet =
statement.execute(
"select sum(temperature) from root.sg1.d1 "
@@ -153,6 +156,7 @@ public class IoTDBGroupByMonthIT {
DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
Statement statement = connection.createStatement()) {
+ ((IoTDBConnection) connection).setTimeZone("GMT+00:00");
boolean hasResultSet =
statement.execute(
"select sum(temperature) from root.sg1.d1 "
@@ -186,6 +190,7 @@ public class IoTDBGroupByMonthIT {
"02/28/2021:00:00:00", "31.0"
};
+ ((IoTDBConnection) connection).setTimeZone("GMT+00:00");
boolean hasResultSet =
statement.execute(
"select sum(temperature) from root.sg1.d1 GROUP BY ([1612051200000, 1617148800000), 1mo)");
@@ -231,6 +236,7 @@ public class IoTDBGroupByMonthIT {
DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
Statement statement = connection.createStatement()) {
+ ((IoTDBConnection) connection).setTimeZone("GMT+00:00");
boolean hasResultSet =
statement.execute(
"select sum(temperature) from root.sg1.d1 GROUP BY ([now() - 1mo, now()), 1d)");
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/GroupByMonthFilter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/GroupByMonthFilter.java
index df64437..213d520 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/GroupByMonthFilter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/GroupByMonthFilter.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import java.util.Calendar;
import java.util.Objects;
+import java.util.TimeZone;
/**
* GroupByMonthFilter is used to handle natural month slidingStep and interval by generating
@@ -44,9 +45,11 @@ public class GroupByMonthFilter extends GroupByFilter {
long startTime,
long endTime,
boolean isSlidingStepByMonth,
- boolean isIntervalByMonth) {
+ boolean isIntervalByMonth,
+ TimeZone timeZone) {
super(interval, slidingStep, startTime, endTime);
initialStartTime = startTime;
+ calendar.setTimeZone(timeZone);
calendar.setTimeInMillis(startTime);
this.isIntervalByMonth = isIntervalByMonth;
this.isSlidingStepByMonth = isSlidingStepByMonth;
@@ -68,6 +71,7 @@ public class GroupByMonthFilter extends GroupByFilter {
slidingStepsInMo = filter.slidingStepsInMo;
initialStartTime = filter.initialStartTime;
calendar = Calendar.getInstance();
+ calendar.setTimeZone(filter.calendar.getTimeZone());
calendar.setTimeInMillis(filter.calendar.getTimeInMillis());
}
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/read/filter/GroupByMonthFilterTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/read/filter/GroupByMonthFilterTest.java
index d816090..a374c74 100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/read/filter/GroupByMonthFilterTest.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/read/filter/GroupByMonthFilterTest.java
@@ -24,6 +24,8 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.junit.Test;
+import java.util.TimeZone;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
@@ -41,7 +43,8 @@ public class GroupByMonthFilterTest {
@Test
public void TestSatisfy1() {
GroupByMonthFilter filter =
- new GroupByMonthFilter(MS_TO_MONTH, 2 * MS_TO_MONTH, 0, END_TIME, true, true);
+ new GroupByMonthFilter(
+ MS_TO_MONTH, 2 * MS_TO_MONTH, 0, END_TIME, true, true, TimeZone.getTimeZone("+08:00"));
// 1970-01-01 08:00:00, timezone = GMT+08:00
assertTrue(filter.satisfy(0, null));
@@ -75,7 +78,8 @@ public class GroupByMonthFilterTest {
@Test
public void TestSatisfy2() {
GroupByMonthFilter filter =
- new GroupByMonthFilter(MS_TO_MONTH, MS_TO_MONTH, 0, END_TIME, true, true);
+ new GroupByMonthFilter(
+ MS_TO_MONTH, MS_TO_MONTH, 0, END_TIME, true, true, TimeZone.getTimeZone("+08:00"));
// 1970-01-01 08:00:00, timezone = GMT+08:00
assertTrue(filter.satisfy(0, null));
@@ -106,7 +110,8 @@ public class GroupByMonthFilterTest {
@Test
public void TestSatisfy3() {
GroupByMonthFilter filter =
- new GroupByMonthFilter(MS_TO_DAY, MS_TO_MONTH, 0, END_TIME, true, false);
+ new GroupByMonthFilter(
+ MS_TO_DAY, MS_TO_MONTH, 0, END_TIME, true, false, TimeZone.getTimeZone("+08:00"));
// 1970-01-01 08:00:00, timezone = GMT+08:00
assertTrue(filter.satisfy(0, null));
@@ -137,7 +142,8 @@ public class GroupByMonthFilterTest {
@Test
public void TestSatisfy4() {
GroupByMonthFilter filter =
- new GroupByMonthFilter(MS_TO_MONTH, MS_TO_DAY * 100, 0, END_TIME, false, true);
+ new GroupByMonthFilter(
+ MS_TO_MONTH, MS_TO_DAY * 100, 0, END_TIME, false, true, TimeZone.getTimeZone("+08:00"));
// 1970-01-01 08:00:00, timezone = GMT+08:00
assertTrue(filter.satisfy(0, null));
@@ -156,7 +162,8 @@ public class GroupByMonthFilterTest {
@Test
public void TestSatisfyStartEndTime() {
GroupByMonthFilter filter =
- new GroupByMonthFilter(MS_TO_DAY, MS_TO_MONTH, 0, END_TIME, true, false);
+ new GroupByMonthFilter(
+ MS_TO_DAY, MS_TO_MONTH, 0, END_TIME, true, false, TimeZone.getTimeZone("+08:00"));
// 1970-01-01 08:00:00 - 1970-01-02 08:00:00, timezone = GMT+08:00
Statistics statistics = new LongStatistics();
@@ -202,7 +209,8 @@ public class GroupByMonthFilterTest {
@Test
public void TestContainStartEndTime() {
GroupByMonthFilter filter =
- new GroupByMonthFilter(MS_TO_DAY, MS_TO_MONTH, 0, END_TIME, true, false);
+ new GroupByMonthFilter(
+ MS_TO_DAY, MS_TO_MONTH, 0, END_TIME, true, false, TimeZone.getTimeZone("+08:00"));
// 1970-01-01 08:00:00 - 1970-01-02 08:00:00, timezone = GMT+08:00
assertFalse(filter.containStartEndTime(0, MS_TO_DAY));
@@ -238,11 +246,13 @@ public class GroupByMonthFilterTest {
@Test
public void TestEquals() {
GroupByMonthFilter filter =
- new GroupByMonthFilter(MS_TO_DAY, MS_TO_MONTH, 0, END_TIME, true, false);
+ new GroupByMonthFilter(
+ MS_TO_DAY, MS_TO_MONTH, 0, END_TIME, true, false, TimeZone.getTimeZone("+08:00"));
Filter filter2 = filter.copy();
assertEquals(filter, filter2);
GroupByMonthFilter filter3 =
- new GroupByMonthFilter(MS_TO_MONTH, MS_TO_MONTH, 0, END_TIME, true, true);
+ new GroupByMonthFilter(
+ MS_TO_MONTH, MS_TO_MONTH, 0, END_TIME, true, true, TimeZone.getTimeZone("+08:00"));
assertNotEquals(filter, filter3);
}
}