You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/12/29 09:21:15 UTC

[iotdb] 04/05: implement session

This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/querySession
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 2f373dfb0825ce1a481486dc62556728580a1184
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Thu Dec 29 16:06:50 2022 +0800

    implement session
---
 .../main/java/org/apache/iotdb/SessionExample.java |  86 +++++++++-------
 .../java/org/apache/iotdb/isession/ISession.java   |  12 ++-
 .../apache/iotdb/isession/pool/ISessionPool.java   |  19 ++--
 .../apache/iotdb/isession/util/Aggregation.java    |   5 +
 session/pom.xml                                    |   6 ++
 .../java/org/apache/iotdb/session/Session.java     |  87 ++++++++++++++--
 .../apache/iotdb/session/SessionConnection.java    | 109 +++++++++++++++++++++
 .../org/apache/iotdb/session/pool/SessionPool.java |  92 +++++++++++++++--
 8 files changed, 350 insertions(+), 66 deletions(-)

diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
index 3f77f67324..085bf00f74 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
@@ -22,10 +22,10 @@ package org.apache.iotdb;
 import org.apache.iotdb.isession.SessionDataSet;
 import org.apache.iotdb.isession.SessionDataSet.DataIterator;
 import org.apache.iotdb.isession.template.Template;
+import org.apache.iotdb.isession.util.Aggregation;
 import org.apache.iotdb.isession.util.Version;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.rpc.StatementExecutionException;
-import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.session.Session;
 import org.apache.iotdb.session.template.MeasurementNode;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
@@ -73,45 +73,57 @@ public class SessionExample {
     // set session fetchSize
     session.setFetchSize(10000);
 
-    try {
-      session.createDatabase("root.sg1");
-    } catch (StatementExecutionException e) {
-      if (e.getStatusCode() != TSStatusCode.PATH_ALREADY_EXIST.getStatusCode()) {
-        throw e;
+    try (SessionDataSet dataSet =
+        session.executeAggregationQuery(
+            Collections.singletonList("root.sg1.d1.*"),
+            Collections.singletonList(Aggregation.COUNT))) {
+
+      System.out.println(dataSet.getColumnNames());
+      dataSet.setFetchSize(1024);
+      while (dataSet.hasNext()) {
+        System.out.println(dataSet.next());
       }
     }
 
-    //     createTemplate();
-    createTimeseries();
-    createMultiTimeseries();
-    insertRecord();
-    insertTablet();
-    //    insertTabletWithNullValues();
-    //    insertTablets();
-    //    insertRecords();
-    //    insertText();
-    //    selectInto();
-    //    createAndDropContinuousQueries();
-    //    nonQuery();
-    //    query();
-    //    queryWithTimeout();
-    //    rawDataQuery();
-    //    lastDataQuery();
-    //    queryByIterator();
-    //    deleteData();
-    //    deleteTimeseries();
-    //    setTimeout();
-
-    sessionEnableRedirect = new Session(LOCAL_HOST, 6667, "root", "root");
-    sessionEnableRedirect.setEnableQueryRedirection(true);
-    sessionEnableRedirect.open(false);
-
-    // set session fetchSize
-    sessionEnableRedirect.setFetchSize(10000);
-
-    insertRecord4Redirect();
-    query4Redirect();
-    sessionEnableRedirect.close();
+    //    try {
+    //      session.createDatabase("root.sg1");
+    //    } catch (StatementExecutionException e) {
+    //      if (e.getStatusCode() != TSStatusCode.PATH_ALREADY_EXIST.getStatusCode()) {
+    //        throw e;
+    //      }
+    //    }
+    //
+    //    //     createTemplate();
+    //    createTimeseries();
+    //    createMultiTimeseries();
+    //    insertRecord();
+    //    insertTablet();
+    //    //    insertTabletWithNullValues();
+    //    //    insertTablets();
+    //    //    insertRecords();
+    //    //    insertText();
+    //    //    selectInto();
+    //    //    createAndDropContinuousQueries();
+    //    //    nonQuery();
+    //    //    query();
+    //    //    queryWithTimeout();
+    //    //    rawDataQuery();
+    //    //    lastDataQuery();
+    //    //    queryByIterator();
+    //    //    deleteData();
+    //    //    deleteTimeseries();
+    //    //    setTimeout();
+    //
+    //    sessionEnableRedirect = new Session(LOCAL_HOST, 6667, "root", "root");
+    //    sessionEnableRedirect.setEnableQueryRedirection(true);
+    //    sessionEnableRedirect.open(false);
+    //
+    //    // set session fetchSize
+    //    sessionEnableRedirect.setFetchSize(10000);
+    //
+    //    insertRecord4Redirect();
+    //    query4Redirect();
+    //    sessionEnableRedirect.close();
     session.close();
   }
 
diff --git a/isession/src/main/java/org/apache/iotdb/isession/ISession.java b/isession/src/main/java/org/apache/iotdb/isession/ISession.java
index 3b26c5a89e..6a911876bc 100644
--- a/isession/src/main/java/org/apache/iotdb/isession/ISession.java
+++ b/isession/src/main/java/org/apache/iotdb/isession/ISession.java
@@ -158,17 +158,20 @@ public interface ISession extends AutoCloseable {
   SessionDataSet executeLastDataQuery(List<String> paths)
       throws StatementExecutionException, IoTDBConnectionException;
 
-  SessionDataSet executeAggregationQuery(List<String> paths, List<Aggregation> aggregations);
+  SessionDataSet executeAggregationQuery(List<String> paths, List<Aggregation> aggregations)
+      throws StatementExecutionException, IoTDBConnectionException;
 
   SessionDataSet executeAggregationQuery(
-      List<String> paths, List<Aggregation> aggregations, long startTime, long endTime);
+      List<String> paths, List<Aggregation> aggregations, long startTime, long endTime)
+      throws StatementExecutionException, IoTDBConnectionException;
 
   SessionDataSet executeAggregationQuery(
       List<String> paths,
       List<Aggregation> aggregations,
       long startTime,
       long endTime,
-      long interval);
+      long interval)
+      throws StatementExecutionException, IoTDBConnectionException;
 
   SessionDataSet executeAggregationQuery(
       List<String> paths,
@@ -176,7 +179,8 @@ public interface ISession extends AutoCloseable {
       long startTime,
       long endTime,
       long interval,
-      long slidingStep);
+      long slidingStep)
+      throws StatementExecutionException, IoTDBConnectionException;
 
   void insertRecord(
       String deviceId,
diff --git a/isession/src/main/java/org/apache/iotdb/isession/pool/ISessionPool.java b/isession/src/main/java/org/apache/iotdb/isession/pool/ISessionPool.java
index 0367b72b97..536c2b3fef 100644
--- a/isession/src/main/java/org/apache/iotdb/isession/pool/ISessionPool.java
+++ b/isession/src/main/java/org/apache/iotdb/isession/pool/ISessionPool.java
@@ -18,7 +18,6 @@
  */
 package org.apache.iotdb.isession.pool;
 
-import org.apache.iotdb.isession.SessionDataSet;
 import org.apache.iotdb.isession.template.Template;
 import org.apache.iotdb.isession.util.Aggregation;
 import org.apache.iotdb.isession.util.SystemStatus;
@@ -419,25 +418,29 @@ public interface ISessionPool {
   SessionDataSetWrapper executeLastDataQuery(List<String> paths)
       throws StatementExecutionException, IoTDBConnectionException;
 
-  SessionDataSet executeAggregationQuery(List<String> paths, List<Aggregation> aggregations);
+  SessionDataSetWrapper executeAggregationQuery(List<String> paths, List<Aggregation> aggregations)
+      throws StatementExecutionException, IoTDBConnectionException;
 
-  SessionDataSet executeAggregationQuery(
-      List<String> paths, List<Aggregation> aggregations, long startTime, long endTime);
+  SessionDataSetWrapper executeAggregationQuery(
+      List<String> paths, List<Aggregation> aggregations, long startTime, long endTime)
+      throws StatementExecutionException, IoTDBConnectionException;
 
-  SessionDataSet executeAggregationQuery(
+  SessionDataSetWrapper executeAggregationQuery(
       List<String> paths,
       List<Aggregation> aggregations,
       long startTime,
       long endTime,
-      long interval);
+      long interval)
+      throws StatementExecutionException, IoTDBConnectionException;
 
-  SessionDataSet executeAggregationQuery(
+  SessionDataSetWrapper executeAggregationQuery(
       List<String> paths,
       List<Aggregation> aggregations,
       long startTime,
       long endTime,
       long interval,
-      long slidingStep);
+      long slidingStep)
+      throws StatementExecutionException, IoTDBConnectionException;
 
   int getMaxSize();
 
diff --git a/isession/src/main/java/org/apache/iotdb/isession/util/Aggregation.java b/isession/src/main/java/org/apache/iotdb/isession/util/Aggregation.java
index 264032cf02..8d612c2b04 100644
--- a/isession/src/main/java/org/apache/iotdb/isession/util/Aggregation.java
+++ b/isession/src/main/java/org/apache/iotdb/isession/util/Aggregation.java
@@ -30,4 +30,9 @@ public enum Aggregation {
   MAX_VALUE,
   MIN_VALUE,
   EXTREME;
+
+  @Override
+  public String toString() {
+    return name().toLowerCase();
+  }
 }
diff --git a/session/pom.xml b/session/pom.xml
index 7097aba703..365a8bfd78 100644
--- a/session/pom.xml
+++ b/session/pom.xml
@@ -87,6 +87,12 @@
             <version>${project.version}</version>
             <scope>compile</scope>
         </dependency>
+        <!-- antlr -->
+        <dependency>
+            <groupId>org.apache.iotdb</groupId>
+            <artifactId>iotdb-antlr</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
     <profiles>
         <profile>
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index 196b98f2ef..354eac731e 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -839,15 +839,48 @@ public class Session implements ISession {
   }
 
   @Override
-  public SessionDataSet executeAggregationQuery(
-      List<String> paths, List<Aggregation> aggregations) {
-    return null;
+  public SessionDataSet executeAggregationQuery(List<String> paths, List<Aggregation> aggregations)
+      throws StatementExecutionException, IoTDBConnectionException {
+    try {
+      return defaultSessionConnection.executeAggregationQuery(paths, aggregations);
+    } catch (RedirectException e) {
+      handleQueryRedirection(e.getEndPoint());
+      if (enableQueryRedirection) {
+        // retry
+        try {
+          return defaultSessionConnection.executeAggregationQuery(paths, aggregations);
+        } catch (RedirectException redirectException) {
+          logger.error("redirect twice", redirectException);
+          throw new StatementExecutionException("redirect twice, please try again.");
+        }
+      } else {
+        throw new StatementExecutionException(MSG_DONOT_ENABLE_REDIRECT);
+      }
+    }
   }
 
   @Override
   public SessionDataSet executeAggregationQuery(
-      List<String> paths, List<Aggregation> aggregations, long startTime, long endTime) {
-    return null;
+      List<String> paths, List<Aggregation> aggregations, long startTime, long endTime)
+      throws StatementExecutionException, IoTDBConnectionException {
+    try {
+      return defaultSessionConnection.executeAggregationQuery(
+          paths, aggregations, startTime, endTime);
+    } catch (RedirectException e) {
+      handleQueryRedirection(e.getEndPoint());
+      if (enableQueryRedirection) {
+        // retry
+        try {
+          return defaultSessionConnection.executeAggregationQuery(
+              paths, aggregations, startTime, endTime);
+        } catch (RedirectException redirectException) {
+          logger.error("redirect twice", redirectException);
+          throw new StatementExecutionException("redirect twice, please try again.");
+        }
+      } else {
+        throw new StatementExecutionException(MSG_DONOT_ENABLE_REDIRECT);
+      }
+    }
   }
 
   @Override
@@ -856,8 +889,26 @@ public class Session implements ISession {
       List<Aggregation> aggregations,
       long startTime,
       long endTime,
-      long interval) {
-    return null;
+      long interval)
+      throws StatementExecutionException, IoTDBConnectionException {
+    try {
+      return defaultSessionConnection.executeAggregationQuery(
+          paths, aggregations, startTime, endTime, interval);
+    } catch (RedirectException e) {
+      handleQueryRedirection(e.getEndPoint());
+      if (enableQueryRedirection) {
+        // retry
+        try {
+          return defaultSessionConnection.executeAggregationQuery(
+              paths, aggregations, startTime, endTime, interval);
+        } catch (RedirectException redirectException) {
+          logger.error("redirect twice", redirectException);
+          throw new StatementExecutionException("redirect twice, please try again.");
+        }
+      } else {
+        throw new StatementExecutionException(MSG_DONOT_ENABLE_REDIRECT);
+      }
+    }
   }
 
   @Override
@@ -867,8 +918,26 @@ public class Session implements ISession {
       long startTime,
       long endTime,
       long interval,
-      long slidingStep) {
-    return null;
+      long slidingStep)
+      throws StatementExecutionException, IoTDBConnectionException {
+    try {
+      return defaultSessionConnection.executeAggregationQuery(
+          paths, aggregations, startTime, endTime, interval, slidingStep);
+    } catch (RedirectException e) {
+      handleQueryRedirection(e.getEndPoint());
+      if (enableQueryRedirection) {
+        // retry
+        try {
+          return defaultSessionConnection.executeAggregationQuery(
+              paths, aggregations, startTime, endTime, interval, slidingStep);
+        } catch (RedirectException redirectException) {
+          logger.error("redirect twice", redirectException);
+          throw new StatementExecutionException("redirect twice, please try again.");
+        }
+      } else {
+        throw new StatementExecutionException(MSG_DONOT_ENABLE_REDIRECT);
+      }
+    }
   }
 
   /**
diff --git a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index 8f6d9306f2..a2282ad80c 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -20,15 +20,18 @@
 package org.apache.iotdb.session;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TPartialPath;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.isession.SessionConfig;
 import org.apache.iotdb.isession.SessionDataSet;
+import org.apache.iotdb.isession.util.Aggregation;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.rpc.RedirectException;
 import org.apache.iotdb.rpc.RpcTransportFactory;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.StatementExecutionException;
 import org.apache.iotdb.service.rpc.thrift.IClientRPCService;
+import org.apache.iotdb.service.rpc.thrift.TSAggregationQueryReq;
 import org.apache.iotdb.service.rpc.thrift.TSAppendSchemaTemplateReq;
 import org.apache.iotdb.service.rpc.thrift.TSBackupConfigurationResp;
 import org.apache.iotdb.service.rpc.thrift.TSCloseSessionReq;
@@ -60,6 +63,7 @@ import org.apache.iotdb.service.rpc.thrift.TSSetSchemaTemplateReq;
 import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneReq;
 import org.apache.iotdb.service.rpc.thrift.TSUnsetSchemaTemplateReq;
 import org.apache.iotdb.session.util.SessionUtils;
+import org.apache.iotdb.tsfile.read.common.parser.PathNodesGenerator;
 
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TBinaryProtocol;
@@ -71,9 +75,11 @@ import org.slf4j.LoggerFactory;
 
 import java.time.ZoneId;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Random;
 import java.util.StringJoiner;
+import java.util.stream.Collectors;
 
 public class SessionConnection {
 
@@ -487,6 +493,109 @@ public class SessionConnection {
         tsExecuteStatementResp.moreData);
   }
 
+  protected SessionDataSet executeAggregationQuery(
+      List<String> paths, List<Aggregation> aggregations)
+      throws StatementExecutionException, IoTDBConnectionException, RedirectException {
+    TSAggregationQueryReq req = createAggregationQueryReq(paths, aggregations);
+    return executeAggregationQuery(req);
+  }
+
+  protected SessionDataSet executeAggregationQuery(
+      List<String> paths, List<Aggregation> aggregations, long startTime, long endTime)
+      throws StatementExecutionException, IoTDBConnectionException, RedirectException {
+    TSAggregationQueryReq req = createAggregationQueryReq(paths, aggregations);
+    req.setStartTime(startTime);
+    req.setEndTime(endTime);
+    return executeAggregationQuery(req);
+  }
+
+  protected SessionDataSet executeAggregationQuery(
+      List<String> paths,
+      List<Aggregation> aggregations,
+      long startTime,
+      long endTime,
+      long interval)
+      throws StatementExecutionException, IoTDBConnectionException, RedirectException {
+    TSAggregationQueryReq req = createAggregationQueryReq(paths, aggregations);
+    req.setStartTime(startTime);
+    req.setEndTime(endTime);
+    req.setInterval(interval);
+    return executeAggregationQuery(req);
+  }
+
+  protected SessionDataSet executeAggregationQuery(
+      List<String> paths,
+      List<Aggregation> aggregations,
+      long startTime,
+      long endTime,
+      long interval,
+      long slidingStep)
+      throws StatementExecutionException, IoTDBConnectionException, RedirectException {
+    TSAggregationQueryReq req = createAggregationQueryReq(paths, aggregations);
+    req.setStartTime(startTime);
+    req.setEndTime(endTime);
+    req.setInterval(interval);
+    req.setSlidingStep(slidingStep);
+    return executeAggregationQuery(req);
+  }
+
+  private SessionDataSet executeAggregationQuery(TSAggregationQueryReq tsAggregationQueryReq)
+      throws StatementExecutionException, IoTDBConnectionException, RedirectException {
+    TSExecuteStatementResp tsExecuteStatementResp;
+    try {
+      tsExecuteStatementResp = client.executeAggregationQuery(tsAggregationQueryReq);
+      RpcUtils.verifySuccessWithRedirection(tsExecuteStatementResp.getStatus());
+    } catch (TException e) {
+      if (reconnect()) {
+        try {
+          tsAggregationQueryReq.setSessionId(sessionId);
+          tsAggregationQueryReq.setStatementId(statementId);
+          tsExecuteStatementResp = client.executeAggregationQuery(tsAggregationQueryReq);
+        } catch (TException tException) {
+          throw new IoTDBConnectionException(tException);
+        }
+      } else {
+        throw new IoTDBConnectionException(logForReconnectionFailure());
+      }
+    }
+
+    RpcUtils.verifySuccess(tsExecuteStatementResp.getStatus());
+    return new SessionDataSet(
+        "",
+        tsExecuteStatementResp.getColumns(),
+        tsExecuteStatementResp.getDataTypeList(),
+        tsExecuteStatementResp.columnNameIndexMap,
+        tsExecuteStatementResp.getQueryId(),
+        statementId,
+        client,
+        sessionId,
+        tsExecuteStatementResp.queryResult,
+        tsExecuteStatementResp.isIgnoreTimeStamp(),
+        tsExecuteStatementResp.moreData);
+  }
+
+  private List<TPartialPath> convertToPartialPaths(List<String> paths) {
+    List<TPartialPath> selectPaths = new ArrayList<>();
+    for (String pathStr : paths) {
+      selectPaths.add(
+          new TPartialPath(Arrays.asList(PathNodesGenerator.splitPathToNodes(pathStr))));
+    }
+    return selectPaths;
+  }
+
+  private TSAggregationQueryReq createAggregationQueryReq(
+      List<String> paths, List<Aggregation> aggregations) {
+    TSAggregationQueryReq req =
+        new TSAggregationQueryReq(
+            sessionId,
+            statementId,
+            convertToPartialPaths(paths),
+            aggregations.stream().map(Enum::toString).collect(Collectors.toList()));
+    req.setFetchSize(session.getFetchSize());
+    req.setTimeout(session.getQueryTimeout());
+    return req;
+  }
+
   protected void insertRecord(TSInsertRecordReq request)
       throws IoTDBConnectionException, StatementExecutionException, RedirectException {
     request.setSessionId(sessionId);
diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index 89590cb81f..be024bae5e 100644
--- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -2556,35 +2556,111 @@ public class SessionPool implements ISessionPool {
   }
 
   @Override
-  public SessionDataSet executeAggregationQuery(
-      List<String> paths, List<Aggregation> aggregations) {
+  public SessionDataSetWrapper executeAggregationQuery(
+      List<String> paths, List<Aggregation> aggregations)
+      throws StatementExecutionException, IoTDBConnectionException {
+    for (int i = 0; i < RETRY; i++) {
+      ISession session = getSession();
+      try {
+        SessionDataSet resp = session.executeAggregationQuery(paths, aggregations);
+        SessionDataSetWrapper wrapper = new SessionDataSetWrapper(resp, session, this);
+        occupy(session);
+        return wrapper;
+      } catch (IoTDBConnectionException e) {
+        // TException means the connection is broken, remove it and get a new one.
+        logger.warn("executeAggregationQuery failed", e);
+        cleanSessionAndMayThrowConnectionException(session, i, e);
+      } catch (StatementExecutionException | RuntimeException e) {
+        putBack(session);
+        throw e;
+      }
+    }
+    // never go here
     return null;
   }
 
   @Override
-  public SessionDataSet executeAggregationQuery(
-      List<String> paths, List<Aggregation> aggregations, long startTime, long endTime) {
+  public SessionDataSetWrapper executeAggregationQuery(
+      List<String> paths, List<Aggregation> aggregations, long startTime, long endTime)
+      throws StatementExecutionException, IoTDBConnectionException {
+    for (int i = 0; i < RETRY; i++) {
+      ISession session = getSession();
+      try {
+        SessionDataSet resp =
+            session.executeAggregationQuery(paths, aggregations, startTime, endTime);
+        SessionDataSetWrapper wrapper = new SessionDataSetWrapper(resp, session, this);
+        occupy(session);
+        return wrapper;
+      } catch (IoTDBConnectionException e) {
+        // TException means the connection is broken, remove it and get a new one.
+        logger.warn("executeAggregationQuery failed", e);
+        cleanSessionAndMayThrowConnectionException(session, i, e);
+      } catch (StatementExecutionException | RuntimeException e) {
+        putBack(session);
+        throw e;
+      }
+    }
+    // never go here
     return null;
   }
 
   @Override
-  public SessionDataSet executeAggregationQuery(
+  public SessionDataSetWrapper executeAggregationQuery(
       List<String> paths,
       List<Aggregation> aggregations,
       long startTime,
       long endTime,
-      long interval) {
+      long interval)
+      throws StatementExecutionException, IoTDBConnectionException {
+    for (int i = 0; i < RETRY; i++) {
+      ISession session = getSession();
+      try {
+        SessionDataSet resp =
+            session.executeAggregationQuery(paths, aggregations, startTime, endTime, interval);
+        SessionDataSetWrapper wrapper = new SessionDataSetWrapper(resp, session, this);
+        occupy(session);
+        return wrapper;
+      } catch (IoTDBConnectionException e) {
+        // TException means the connection is broken, remove it and get a new one.
+        logger.warn("executeAggregationQuery failed", e);
+        cleanSessionAndMayThrowConnectionException(session, i, e);
+      } catch (StatementExecutionException | RuntimeException e) {
+        putBack(session);
+        throw e;
+      }
+    }
+    // never go here
     return null;
   }
 
   @Override
-  public SessionDataSet executeAggregationQuery(
+  public SessionDataSetWrapper executeAggregationQuery(
       List<String> paths,
       List<Aggregation> aggregations,
       long startTime,
       long endTime,
       long interval,
-      long slidingStep) {
+      long slidingStep)
+      throws StatementExecutionException, IoTDBConnectionException {
+    for (int i = 0; i < RETRY; i++) {
+      ISession session = getSession();
+      try {
+        SessionDataSet resp =
+            session.executeAggregationQuery(
+                paths, aggregations, startTime, endTime, interval, slidingStep);
+        SessionDataSetWrapper wrapper = new SessionDataSetWrapper(resp, session, this);
+        occupy(session);
+        return wrapper;
+      } catch (IoTDBConnectionException e) {
+        // TException means the connection is broken, remove it and get a new one.
+        logger.warn("executeAggregationQuery failed", e);
+        cleanSessionAndMayThrowConnectionException(session, i, e);
+      } catch (StatementExecutionException | RuntimeException e) {
+        putBack(session);
+        throw e;
+      }
+    }
+    // never go here
     return null;
   }