You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/12/29 09:21:15 UTC
[iotdb] 04/05: implement session
This is an automated email from the ASF dual-hosted git repository.
hui pushed a commit to branch lmh/querySession
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 2f373dfb0825ce1a481486dc62556728580a1184
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Thu Dec 29 16:06:50 2022 +0800
implement session
---
.../main/java/org/apache/iotdb/SessionExample.java | 86 +++++++++-------
.../java/org/apache/iotdb/isession/ISession.java | 12 ++-
.../apache/iotdb/isession/pool/ISessionPool.java | 19 ++--
.../apache/iotdb/isession/util/Aggregation.java | 5 +
session/pom.xml | 6 ++
.../java/org/apache/iotdb/session/Session.java | 87 ++++++++++++++--
.../apache/iotdb/session/SessionConnection.java | 109 +++++++++++++++++++++
.../org/apache/iotdb/session/pool/SessionPool.java | 92 +++++++++++++++--
8 files changed, 350 insertions(+), 66 deletions(-)
diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
index 3f77f67324..085bf00f74 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
@@ -22,10 +22,10 @@ package org.apache.iotdb;
import org.apache.iotdb.isession.SessionDataSet;
import org.apache.iotdb.isession.SessionDataSet.DataIterator;
import org.apache.iotdb.isession.template.Template;
+import org.apache.iotdb.isession.util.Aggregation;
import org.apache.iotdb.isession.util.Version;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
-import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.session.Session;
import org.apache.iotdb.session.template.MeasurementNode;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
@@ -73,45 +73,57 @@ public class SessionExample {
// set session fetchSize
session.setFetchSize(10000);
- try {
- session.createDatabase("root.sg1");
- } catch (StatementExecutionException e) {
- if (e.getStatusCode() != TSStatusCode.PATH_ALREADY_EXIST.getStatusCode()) {
- throw e;
+ try (SessionDataSet dataSet =
+ session.executeAggregationQuery(
+ Collections.singletonList("root.sg1.d1.*"),
+ Collections.singletonList(Aggregation.COUNT))) {
+
+ System.out.println(dataSet.getColumnNames());
+ dataSet.setFetchSize(1024);
+ while (dataSet.hasNext()) {
+ System.out.println(dataSet.next());
}
}
- // createTemplate();
- createTimeseries();
- createMultiTimeseries();
- insertRecord();
- insertTablet();
- // insertTabletWithNullValues();
- // insertTablets();
- // insertRecords();
- // insertText();
- // selectInto();
- // createAndDropContinuousQueries();
- // nonQuery();
- // query();
- // queryWithTimeout();
- // rawDataQuery();
- // lastDataQuery();
- // queryByIterator();
- // deleteData();
- // deleteTimeseries();
- // setTimeout();
-
- sessionEnableRedirect = new Session(LOCAL_HOST, 6667, "root", "root");
- sessionEnableRedirect.setEnableQueryRedirection(true);
- sessionEnableRedirect.open(false);
-
- // set session fetchSize
- sessionEnableRedirect.setFetchSize(10000);
-
- insertRecord4Redirect();
- query4Redirect();
- sessionEnableRedirect.close();
+ // try {
+ // session.createDatabase("root.sg1");
+ // } catch (StatementExecutionException e) {
+ // if (e.getStatusCode() != TSStatusCode.PATH_ALREADY_EXIST.getStatusCode()) {
+ // throw e;
+ // }
+ // }
+ //
+ // // createTemplate();
+ // createTimeseries();
+ // createMultiTimeseries();
+ // insertRecord();
+ // insertTablet();
+ // // insertTabletWithNullValues();
+ // // insertTablets();
+ // // insertRecords();
+ // // insertText();
+ // // selectInto();
+ // // createAndDropContinuousQueries();
+ // // nonQuery();
+ // // query();
+ // // queryWithTimeout();
+ // // rawDataQuery();
+ // // lastDataQuery();
+ // // queryByIterator();
+ // // deleteData();
+ // // deleteTimeseries();
+ // // setTimeout();
+ //
+ // sessionEnableRedirect = new Session(LOCAL_HOST, 6667, "root", "root");
+ // sessionEnableRedirect.setEnableQueryRedirection(true);
+ // sessionEnableRedirect.open(false);
+ //
+ // // set session fetchSize
+ // sessionEnableRedirect.setFetchSize(10000);
+ //
+ // insertRecord4Redirect();
+ // query4Redirect();
+ // sessionEnableRedirect.close();
session.close();
}
diff --git a/isession/src/main/java/org/apache/iotdb/isession/ISession.java b/isession/src/main/java/org/apache/iotdb/isession/ISession.java
index 3b26c5a89e..6a911876bc 100644
--- a/isession/src/main/java/org/apache/iotdb/isession/ISession.java
+++ b/isession/src/main/java/org/apache/iotdb/isession/ISession.java
@@ -158,17 +158,20 @@ public interface ISession extends AutoCloseable {
SessionDataSet executeLastDataQuery(List<String> paths)
throws StatementExecutionException, IoTDBConnectionException;
- SessionDataSet executeAggregationQuery(List<String> paths, List<Aggregation> aggregations);
+ SessionDataSet executeAggregationQuery(List<String> paths, List<Aggregation> aggregations)
+ throws StatementExecutionException, IoTDBConnectionException;
SessionDataSet executeAggregationQuery(
- List<String> paths, List<Aggregation> aggregations, long startTime, long endTime);
+ List<String> paths, List<Aggregation> aggregations, long startTime, long endTime)
+ throws StatementExecutionException, IoTDBConnectionException;
SessionDataSet executeAggregationQuery(
List<String> paths,
List<Aggregation> aggregations,
long startTime,
long endTime,
- long interval);
+ long interval)
+ throws StatementExecutionException, IoTDBConnectionException;
SessionDataSet executeAggregationQuery(
List<String> paths,
@@ -176,7 +179,8 @@ public interface ISession extends AutoCloseable {
long startTime,
long endTime,
long interval,
- long slidingStep);
+ long slidingStep)
+ throws StatementExecutionException, IoTDBConnectionException;
void insertRecord(
String deviceId,
diff --git a/isession/src/main/java/org/apache/iotdb/isession/pool/ISessionPool.java b/isession/src/main/java/org/apache/iotdb/isession/pool/ISessionPool.java
index 0367b72b97..536c2b3fef 100644
--- a/isession/src/main/java/org/apache/iotdb/isession/pool/ISessionPool.java
+++ b/isession/src/main/java/org/apache/iotdb/isession/pool/ISessionPool.java
@@ -18,7 +18,6 @@
*/
package org.apache.iotdb.isession.pool;
-import org.apache.iotdb.isession.SessionDataSet;
import org.apache.iotdb.isession.template.Template;
import org.apache.iotdb.isession.util.Aggregation;
import org.apache.iotdb.isession.util.SystemStatus;
@@ -419,25 +418,29 @@ public interface ISessionPool {
SessionDataSetWrapper executeLastDataQuery(List<String> paths)
throws StatementExecutionException, IoTDBConnectionException;
- SessionDataSet executeAggregationQuery(List<String> paths, List<Aggregation> aggregations);
+ SessionDataSetWrapper executeAggregationQuery(List<String> paths, List<Aggregation> aggregations)
+ throws StatementExecutionException, IoTDBConnectionException;
- SessionDataSet executeAggregationQuery(
- List<String> paths, List<Aggregation> aggregations, long startTime, long endTime);
+ SessionDataSetWrapper executeAggregationQuery(
+ List<String> paths, List<Aggregation> aggregations, long startTime, long endTime)
+ throws StatementExecutionException, IoTDBConnectionException;
- SessionDataSet executeAggregationQuery(
+ SessionDataSetWrapper executeAggregationQuery(
List<String> paths,
List<Aggregation> aggregations,
long startTime,
long endTime,
- long interval);
+ long interval)
+ throws StatementExecutionException, IoTDBConnectionException;
- SessionDataSet executeAggregationQuery(
+ SessionDataSetWrapper executeAggregationQuery(
List<String> paths,
List<Aggregation> aggregations,
long startTime,
long endTime,
long interval,
- long slidingStep);
+ long slidingStep)
+ throws StatementExecutionException, IoTDBConnectionException;
int getMaxSize();
diff --git a/isession/src/main/java/org/apache/iotdb/isession/util/Aggregation.java b/isession/src/main/java/org/apache/iotdb/isession/util/Aggregation.java
index 264032cf02..8d612c2b04 100644
--- a/isession/src/main/java/org/apache/iotdb/isession/util/Aggregation.java
+++ b/isession/src/main/java/org/apache/iotdb/isession/util/Aggregation.java
@@ -30,4 +30,9 @@ public enum Aggregation {
MAX_VALUE,
MIN_VALUE,
EXTREME;
+
+ @Override
+ public String toString() {
+ return name().toLowerCase();
+ }
}
diff --git a/session/pom.xml b/session/pom.xml
index 7097aba703..365a8bfd78 100644
--- a/session/pom.xml
+++ b/session/pom.xml
@@ -87,6 +87,12 @@
<version>${project.version}</version>
<scope>compile</scope>
</dependency>
+ <!-- antlr -->
+ <dependency>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>iotdb-antlr</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
<profiles>
<profile>
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index 196b98f2ef..354eac731e 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -839,15 +839,48 @@ public class Session implements ISession {
}
@Override
- public SessionDataSet executeAggregationQuery(
- List<String> paths, List<Aggregation> aggregations) {
- return null;
+ public SessionDataSet executeAggregationQuery(List<String> paths, List<Aggregation> aggregations)
+ throws StatementExecutionException, IoTDBConnectionException {
+ try {
+ return defaultSessionConnection.executeAggregationQuery(paths, aggregations);
+ } catch (RedirectException e) {
+ handleQueryRedirection(e.getEndPoint());
+ if (enableQueryRedirection) {
+ // retry
+ try {
+ return defaultSessionConnection.executeAggregationQuery(paths, aggregations);
+ } catch (RedirectException redirectException) {
+ logger.error("redirect twice", redirectException);
+ throw new StatementExecutionException("redirect twice, please try again.");
+ }
+ } else {
+ throw new StatementExecutionException(MSG_DONOT_ENABLE_REDIRECT);
+ }
+ }
}
@Override
public SessionDataSet executeAggregationQuery(
- List<String> paths, List<Aggregation> aggregations, long startTime, long endTime) {
- return null;
+ List<String> paths, List<Aggregation> aggregations, long startTime, long endTime)
+ throws StatementExecutionException, IoTDBConnectionException {
+ try {
+ return defaultSessionConnection.executeAggregationQuery(
+ paths, aggregations, startTime, endTime);
+ } catch (RedirectException e) {
+ handleQueryRedirection(e.getEndPoint());
+ if (enableQueryRedirection) {
+ // retry
+ try {
+ return defaultSessionConnection.executeAggregationQuery(
+ paths, aggregations, startTime, endTime);
+ } catch (RedirectException redirectException) {
+ logger.error("redirect twice", redirectException);
+ throw new StatementExecutionException("redirect twice, please try again.");
+ }
+ } else {
+ throw new StatementExecutionException(MSG_DONOT_ENABLE_REDIRECT);
+ }
+ }
}
@Override
@@ -856,8 +889,26 @@ public class Session implements ISession {
List<Aggregation> aggregations,
long startTime,
long endTime,
- long interval) {
- return null;
+ long interval)
+ throws StatementExecutionException, IoTDBConnectionException {
+ try {
+ return defaultSessionConnection.executeAggregationQuery(
+ paths, aggregations, startTime, endTime, interval);
+ } catch (RedirectException e) {
+ handleQueryRedirection(e.getEndPoint());
+ if (enableQueryRedirection) {
+ // retry
+ try {
+ return defaultSessionConnection.executeAggregationQuery(
+ paths, aggregations, startTime, endTime, interval);
+ } catch (RedirectException redirectException) {
+ logger.error("redirect twice", redirectException);
+ throw new StatementExecutionException("redirect twice, please try again.");
+ }
+ } else {
+ throw new StatementExecutionException(MSG_DONOT_ENABLE_REDIRECT);
+ }
+ }
}
@Override
@@ -867,8 +918,26 @@ public class Session implements ISession {
long startTime,
long endTime,
long interval,
- long slidingStep) {
- return null;
+ long slidingStep)
+ throws StatementExecutionException, IoTDBConnectionException {
+ try {
+ return defaultSessionConnection.executeAggregationQuery(
+ paths, aggregations, startTime, endTime, interval, slidingStep);
+ } catch (RedirectException e) {
+ handleQueryRedirection(e.getEndPoint());
+ if (enableQueryRedirection) {
+ // retry
+ try {
+ return defaultSessionConnection.executeAggregationQuery(
+ paths, aggregations, startTime, endTime, interval, slidingStep);
+ } catch (RedirectException redirectException) {
+ logger.error("redirect twice", redirectException);
+ throw new StatementExecutionException("redirect twice, please try again.");
+ }
+ } else {
+ throw new StatementExecutionException(MSG_DONOT_ENABLE_REDIRECT);
+ }
+ }
}
/**
diff --git a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index 8f6d9306f2..a2282ad80c 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -20,15 +20,18 @@
package org.apache.iotdb.session;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TPartialPath;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.isession.SessionConfig;
import org.apache.iotdb.isession.SessionDataSet;
+import org.apache.iotdb.isession.util.Aggregation;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.RedirectException;
import org.apache.iotdb.rpc.RpcTransportFactory;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.service.rpc.thrift.IClientRPCService;
+import org.apache.iotdb.service.rpc.thrift.TSAggregationQueryReq;
import org.apache.iotdb.service.rpc.thrift.TSAppendSchemaTemplateReq;
import org.apache.iotdb.service.rpc.thrift.TSBackupConfigurationResp;
import org.apache.iotdb.service.rpc.thrift.TSCloseSessionReq;
@@ -60,6 +63,7 @@ import org.apache.iotdb.service.rpc.thrift.TSSetSchemaTemplateReq;
import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneReq;
import org.apache.iotdb.service.rpc.thrift.TSUnsetSchemaTemplateReq;
import org.apache.iotdb.session.util.SessionUtils;
+import org.apache.iotdb.tsfile.read.common.parser.PathNodesGenerator;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
@@ -71,9 +75,11 @@ import org.slf4j.LoggerFactory;
import java.time.ZoneId;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.StringJoiner;
+import java.util.stream.Collectors;
public class SessionConnection {
@@ -487,6 +493,109 @@ public class SessionConnection {
tsExecuteStatementResp.moreData);
}
+ protected SessionDataSet executeAggregationQuery(
+ List<String> paths, List<Aggregation> aggregations)
+ throws StatementExecutionException, IoTDBConnectionException, RedirectException {
+ TSAggregationQueryReq req = createAggregationQueryReq(paths, aggregations);
+ return executeAggregationQuery(req);
+ }
+
+ protected SessionDataSet executeAggregationQuery(
+ List<String> paths, List<Aggregation> aggregations, long startTime, long endTime)
+ throws StatementExecutionException, IoTDBConnectionException, RedirectException {
+ TSAggregationQueryReq req = createAggregationQueryReq(paths, aggregations);
+ req.setStartTime(startTime);
+ req.setEndTime(endTime);
+ return executeAggregationQuery(req);
+ }
+
+ protected SessionDataSet executeAggregationQuery(
+ List<String> paths,
+ List<Aggregation> aggregations,
+ long startTime,
+ long endTime,
+ long interval)
+ throws StatementExecutionException, IoTDBConnectionException, RedirectException {
+ TSAggregationQueryReq req = createAggregationQueryReq(paths, aggregations);
+ req.setStartTime(startTime);
+ req.setEndTime(endTime);
+ req.setInterval(interval);
+ return executeAggregationQuery(req);
+ }
+
+ protected SessionDataSet executeAggregationQuery(
+ List<String> paths,
+ List<Aggregation> aggregations,
+ long startTime,
+ long endTime,
+ long interval,
+ long slidingStep)
+ throws StatementExecutionException, IoTDBConnectionException, RedirectException {
+ TSAggregationQueryReq req = createAggregationQueryReq(paths, aggregations);
+ req.setStartTime(startTime);
+ req.setEndTime(endTime);
+ req.setInterval(interval);
+ req.setSlidingStep(slidingStep);
+ return executeAggregationQuery(req);
+ }
+
+ private SessionDataSet executeAggregationQuery(TSAggregationQueryReq tsAggregationQueryReq)
+ throws StatementExecutionException, IoTDBConnectionException, RedirectException {
+ TSExecuteStatementResp tsExecuteStatementResp;
+ try {
+ tsExecuteStatementResp = client.executeAggregationQuery(tsAggregationQueryReq);
+ RpcUtils.verifySuccessWithRedirection(tsExecuteStatementResp.getStatus());
+ } catch (TException e) {
+ if (reconnect()) {
+ try {
+ tsAggregationQueryReq.setSessionId(sessionId);
+ tsAggregationQueryReq.setStatementId(statementId);
+ tsExecuteStatementResp = client.executeAggregationQuery(tsAggregationQueryReq);
+ } catch (TException tException) {
+ throw new IoTDBConnectionException(tException);
+ }
+ } else {
+ throw new IoTDBConnectionException(logForReconnectionFailure());
+ }
+ }
+
+ RpcUtils.verifySuccess(tsExecuteStatementResp.getStatus());
+ return new SessionDataSet(
+ "",
+ tsExecuteStatementResp.getColumns(),
+ tsExecuteStatementResp.getDataTypeList(),
+ tsExecuteStatementResp.columnNameIndexMap,
+ tsExecuteStatementResp.getQueryId(),
+ statementId,
+ client,
+ sessionId,
+ tsExecuteStatementResp.queryResult,
+ tsExecuteStatementResp.isIgnoreTimeStamp(),
+ tsExecuteStatementResp.moreData);
+ }
+
+ private List<TPartialPath> convertToPartialPaths(List<String> paths) {
+ List<TPartialPath> selectPaths = new ArrayList<>();
+ for (String pathStr : paths) {
+ selectPaths.add(
+ new TPartialPath(Arrays.asList(PathNodesGenerator.splitPathToNodes(pathStr))));
+ }
+ return selectPaths;
+ }
+
+ private TSAggregationQueryReq createAggregationQueryReq(
+ List<String> paths, List<Aggregation> aggregations) {
+ TSAggregationQueryReq req =
+ new TSAggregationQueryReq(
+ sessionId,
+ statementId,
+ convertToPartialPaths(paths),
+ aggregations.stream().map(Enum::toString).collect(Collectors.toList()));
+ req.setFetchSize(session.getFetchSize());
+ req.setTimeout(session.getQueryTimeout());
+ return req;
+ }
+
protected void insertRecord(TSInsertRecordReq request)
throws IoTDBConnectionException, StatementExecutionException, RedirectException {
request.setSessionId(sessionId);
diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index 89590cb81f..be024bae5e 100644
--- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -2556,35 +2556,111 @@ public class SessionPool implements ISessionPool {
}
@Override
- public SessionDataSet executeAggregationQuery(
- List<String> paths, List<Aggregation> aggregations) {
+ public SessionDataSetWrapper executeAggregationQuery(
+ List<String> paths, List<Aggregation> aggregations)
+ throws StatementExecutionException, IoTDBConnectionException {
+ for (int i = 0; i < RETRY; i++) {
+ ISession session = getSession();
+ try {
+ SessionDataSet resp = session.executeAggregationQuery(paths, aggregations);
+ SessionDataSetWrapper wrapper = new SessionDataSetWrapper(resp, session, this);
+ occupy(session);
+ return wrapper;
+ } catch (IoTDBConnectionException e) {
+ // TException means the connection is broken, remove it and get a new one.
+ logger.warn("executeAggregationQuery failed", e);
+ cleanSessionAndMayThrowConnectionException(session, i, e);
+ } catch (StatementExecutionException | RuntimeException e) {
+ putBack(session);
+ throw e;
+ }
+ }
+ // never go here
return null;
}
@Override
- public SessionDataSet executeAggregationQuery(
- List<String> paths, List<Aggregation> aggregations, long startTime, long endTime) {
+ public SessionDataSetWrapper executeAggregationQuery(
+ List<String> paths, List<Aggregation> aggregations, long startTime, long endTime)
+ throws StatementExecutionException, IoTDBConnectionException {
+ for (int i = 0; i < RETRY; i++) {
+ ISession session = getSession();
+ try {
+ SessionDataSet resp =
+ session.executeAggregationQuery(paths, aggregations, startTime, endTime);
+ SessionDataSetWrapper wrapper = new SessionDataSetWrapper(resp, session, this);
+ occupy(session);
+ return wrapper;
+ } catch (IoTDBConnectionException e) {
+ // TException means the connection is broken, remove it and get a new one.
+ logger.warn("executeAggregationQuery failed", e);
+ cleanSessionAndMayThrowConnectionException(session, i, e);
+ } catch (StatementExecutionException | RuntimeException e) {
+ putBack(session);
+ throw e;
+ }
+ }
+ // never go here
return null;
}
@Override
- public SessionDataSet executeAggregationQuery(
+ public SessionDataSetWrapper executeAggregationQuery(
List<String> paths,
List<Aggregation> aggregations,
long startTime,
long endTime,
- long interval) {
+ long interval)
+ throws StatementExecutionException, IoTDBConnectionException {
+ for (int i = 0; i < RETRY; i++) {
+ ISession session = getSession();
+ try {
+ SessionDataSet resp =
+ session.executeAggregationQuery(paths, aggregations, startTime, endTime, interval);
+ SessionDataSetWrapper wrapper = new SessionDataSetWrapper(resp, session, this);
+ occupy(session);
+ return wrapper;
+ } catch (IoTDBConnectionException e) {
+ // TException means the connection is broken, remove it and get a new one.
+ logger.warn("executeAggregationQuery failed", e);
+ cleanSessionAndMayThrowConnectionException(session, i, e);
+ } catch (StatementExecutionException | RuntimeException e) {
+ putBack(session);
+ throw e;
+ }
+ }
+ // never go here
return null;
}
@Override
- public SessionDataSet executeAggregationQuery(
+ public SessionDataSetWrapper executeAggregationQuery(
List<String> paths,
List<Aggregation> aggregations,
long startTime,
long endTime,
long interval,
- long slidingStep) {
+ long slidingStep)
+ throws StatementExecutionException, IoTDBConnectionException {
+ for (int i = 0; i < RETRY; i++) {
+ ISession session = getSession();
+ try {
+ SessionDataSet resp =
+ session.executeAggregationQuery(
+ paths, aggregations, startTime, endTime, interval, slidingStep);
+ SessionDataSetWrapper wrapper = new SessionDataSetWrapper(resp, session, this);
+ occupy(session);
+ return wrapper;
+ } catch (IoTDBConnectionException e) {
+ // TException means the connection is broken, remove it and get a new one.
+ logger.warn("executeAggregationQuery failed", e);
+ cleanSessionAndMayThrowConnectionException(session, i, e);
+ } catch (StatementExecutionException | RuntimeException e) {
+ putBack(session);
+ throw e;
+ }
+ }
+ // never go here
return null;
}