You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/12/29 09:21:11 UTC

[iotdb] branch lmh/querySession created (now 5c1626714e)

This is an automated email from the ASF dual-hosted git repository.

hui pushed a change to branch lmh/querySession
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 5c1626714e add ITs

This branch includes the following new commits:

     new 9d6cd39831 add interface in thrift
     new a0db9be0fb add interface in ISession & ISessionPool
     new f576b91588 implement server
     new 2f373dfb08 implement session
     new 5c1626714e add ITs

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 04/05: implement session

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/querySession
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 2f373dfb0825ce1a481486dc62556728580a1184
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Thu Dec 29 16:06:50 2022 +0800

    implement session
---
 .../main/java/org/apache/iotdb/SessionExample.java |  86 +++++++++-------
 .../java/org/apache/iotdb/isession/ISession.java   |  12 ++-
 .../apache/iotdb/isession/pool/ISessionPool.java   |  19 ++--
 .../apache/iotdb/isession/util/Aggregation.java    |   5 +
 session/pom.xml                                    |   6 ++
 .../java/org/apache/iotdb/session/Session.java     |  87 ++++++++++++++--
 .../apache/iotdb/session/SessionConnection.java    | 109 +++++++++++++++++++++
 .../org/apache/iotdb/session/pool/SessionPool.java |  92 +++++++++++++++--
 8 files changed, 350 insertions(+), 66 deletions(-)

diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
index 3f77f67324..085bf00f74 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
@@ -22,10 +22,10 @@ package org.apache.iotdb;
 import org.apache.iotdb.isession.SessionDataSet;
 import org.apache.iotdb.isession.SessionDataSet.DataIterator;
 import org.apache.iotdb.isession.template.Template;
+import org.apache.iotdb.isession.util.Aggregation;
 import org.apache.iotdb.isession.util.Version;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.rpc.StatementExecutionException;
-import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.session.Session;
 import org.apache.iotdb.session.template.MeasurementNode;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
@@ -73,45 +73,57 @@ public class SessionExample {
     // set session fetchSize
     session.setFetchSize(10000);
 
-    try {
-      session.createDatabase("root.sg1");
-    } catch (StatementExecutionException e) {
-      if (e.getStatusCode() != TSStatusCode.PATH_ALREADY_EXIST.getStatusCode()) {
-        throw e;
+    try (SessionDataSet dataSet =
+        session.executeAggregationQuery(
+            Collections.singletonList("root.sg1.d1.*"),
+            Collections.singletonList(Aggregation.COUNT))) {
+
+      System.out.println(dataSet.getColumnNames());
+      dataSet.setFetchSize(1024);
+      while (dataSet.hasNext()) {
+        System.out.println(dataSet.next());
       }
     }
 
-    //     createTemplate();
-    createTimeseries();
-    createMultiTimeseries();
-    insertRecord();
-    insertTablet();
-    //    insertTabletWithNullValues();
-    //    insertTablets();
-    //    insertRecords();
-    //    insertText();
-    //    selectInto();
-    //    createAndDropContinuousQueries();
-    //    nonQuery();
-    //    query();
-    //    queryWithTimeout();
-    //    rawDataQuery();
-    //    lastDataQuery();
-    //    queryByIterator();
-    //    deleteData();
-    //    deleteTimeseries();
-    //    setTimeout();
-
-    sessionEnableRedirect = new Session(LOCAL_HOST, 6667, "root", "root");
-    sessionEnableRedirect.setEnableQueryRedirection(true);
-    sessionEnableRedirect.open(false);
-
-    // set session fetchSize
-    sessionEnableRedirect.setFetchSize(10000);
-
-    insertRecord4Redirect();
-    query4Redirect();
-    sessionEnableRedirect.close();
+    //    try {
+    //      session.createDatabase("root.sg1");
+    //    } catch (StatementExecutionException e) {
+    //      if (e.getStatusCode() != TSStatusCode.PATH_ALREADY_EXIST.getStatusCode()) {
+    //        throw e;
+    //      }
+    //    }
+    //
+    //    //     createTemplate();
+    //    createTimeseries();
+    //    createMultiTimeseries();
+    //    insertRecord();
+    //    insertTablet();
+    //    //    insertTabletWithNullValues();
+    //    //    insertTablets();
+    //    //    insertRecords();
+    //    //    insertText();
+    //    //    selectInto();
+    //    //    createAndDropContinuousQueries();
+    //    //    nonQuery();
+    //    //    query();
+    //    //    queryWithTimeout();
+    //    //    rawDataQuery();
+    //    //    lastDataQuery();
+    //    //    queryByIterator();
+    //    //    deleteData();
+    //    //    deleteTimeseries();
+    //    //    setTimeout();
+    //
+    //    sessionEnableRedirect = new Session(LOCAL_HOST, 6667, "root", "root");
+    //    sessionEnableRedirect.setEnableQueryRedirection(true);
+    //    sessionEnableRedirect.open(false);
+    //
+    //    // set session fetchSize
+    //    sessionEnableRedirect.setFetchSize(10000);
+    //
+    //    insertRecord4Redirect();
+    //    query4Redirect();
+    //    sessionEnableRedirect.close();
     session.close();
   }
 
diff --git a/isession/src/main/java/org/apache/iotdb/isession/ISession.java b/isession/src/main/java/org/apache/iotdb/isession/ISession.java
index 3b26c5a89e..6a911876bc 100644
--- a/isession/src/main/java/org/apache/iotdb/isession/ISession.java
+++ b/isession/src/main/java/org/apache/iotdb/isession/ISession.java
@@ -158,17 +158,20 @@ public interface ISession extends AutoCloseable {
   SessionDataSet executeLastDataQuery(List<String> paths)
       throws StatementExecutionException, IoTDBConnectionException;
 
-  SessionDataSet executeAggregationQuery(List<String> paths, List<Aggregation> aggregations);
+  SessionDataSet executeAggregationQuery(List<String> paths, List<Aggregation> aggregations)
+      throws StatementExecutionException, IoTDBConnectionException;
 
   SessionDataSet executeAggregationQuery(
-      List<String> paths, List<Aggregation> aggregations, long startTime, long endTime);
+      List<String> paths, List<Aggregation> aggregations, long startTime, long endTime)
+      throws StatementExecutionException, IoTDBConnectionException;
 
   SessionDataSet executeAggregationQuery(
       List<String> paths,
       List<Aggregation> aggregations,
       long startTime,
       long endTime,
-      long interval);
+      long interval)
+      throws StatementExecutionException, IoTDBConnectionException;
 
   SessionDataSet executeAggregationQuery(
       List<String> paths,
@@ -176,7 +179,8 @@ public interface ISession extends AutoCloseable {
       long startTime,
       long endTime,
       long interval,
-      long slidingStep);
+      long slidingStep)
+      throws StatementExecutionException, IoTDBConnectionException;
 
   void insertRecord(
       String deviceId,
diff --git a/isession/src/main/java/org/apache/iotdb/isession/pool/ISessionPool.java b/isession/src/main/java/org/apache/iotdb/isession/pool/ISessionPool.java
index 0367b72b97..536c2b3fef 100644
--- a/isession/src/main/java/org/apache/iotdb/isession/pool/ISessionPool.java
+++ b/isession/src/main/java/org/apache/iotdb/isession/pool/ISessionPool.java
@@ -18,7 +18,6 @@
  */
 package org.apache.iotdb.isession.pool;
 
-import org.apache.iotdb.isession.SessionDataSet;
 import org.apache.iotdb.isession.template.Template;
 import org.apache.iotdb.isession.util.Aggregation;
 import org.apache.iotdb.isession.util.SystemStatus;
@@ -419,25 +418,29 @@ public interface ISessionPool {
   SessionDataSetWrapper executeLastDataQuery(List<String> paths)
       throws StatementExecutionException, IoTDBConnectionException;
 
-  SessionDataSet executeAggregationQuery(List<String> paths, List<Aggregation> aggregations);
+  SessionDataSetWrapper executeAggregationQuery(List<String> paths, List<Aggregation> aggregations)
+      throws StatementExecutionException, IoTDBConnectionException;
 
-  SessionDataSet executeAggregationQuery(
-      List<String> paths, List<Aggregation> aggregations, long startTime, long endTime);
+  SessionDataSetWrapper executeAggregationQuery(
+      List<String> paths, List<Aggregation> aggregations, long startTime, long endTime)
+      throws StatementExecutionException, IoTDBConnectionException;
 
-  SessionDataSet executeAggregationQuery(
+  SessionDataSetWrapper executeAggregationQuery(
       List<String> paths,
       List<Aggregation> aggregations,
       long startTime,
       long endTime,
-      long interval);
+      long interval)
+      throws StatementExecutionException, IoTDBConnectionException;
 
-  SessionDataSet executeAggregationQuery(
+  SessionDataSetWrapper executeAggregationQuery(
       List<String> paths,
       List<Aggregation> aggregations,
       long startTime,
       long endTime,
       long interval,
-      long slidingStep);
+      long slidingStep)
+      throws StatementExecutionException, IoTDBConnectionException;
 
   int getMaxSize();
 
diff --git a/isession/src/main/java/org/apache/iotdb/isession/util/Aggregation.java b/isession/src/main/java/org/apache/iotdb/isession/util/Aggregation.java
index 264032cf02..8d612c2b04 100644
--- a/isession/src/main/java/org/apache/iotdb/isession/util/Aggregation.java
+++ b/isession/src/main/java/org/apache/iotdb/isession/util/Aggregation.java
@@ -30,4 +30,9 @@ public enum Aggregation {
   MAX_VALUE,
   MIN_VALUE,
   EXTREME;
+
+  @Override
+  public String toString() {
+    return name().toLowerCase();
+  }
 }
diff --git a/session/pom.xml b/session/pom.xml
index 7097aba703..365a8bfd78 100644
--- a/session/pom.xml
+++ b/session/pom.xml
@@ -87,6 +87,12 @@
             <version>${project.version}</version>
             <scope>compile</scope>
         </dependency>
+        <!-- antlr -->
+        <dependency>
+            <groupId>org.apache.iotdb</groupId>
+            <artifactId>iotdb-antlr</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
     <profiles>
         <profile>
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index 196b98f2ef..354eac731e 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -839,15 +839,48 @@ public class Session implements ISession {
   }
 
   @Override
-  public SessionDataSet executeAggregationQuery(
-      List<String> paths, List<Aggregation> aggregations) {
-    return null;
+  public SessionDataSet executeAggregationQuery(List<String> paths, List<Aggregation> aggregations)
+      throws StatementExecutionException, IoTDBConnectionException {
+    try {
+      return defaultSessionConnection.executeAggregationQuery(paths, aggregations);
+    } catch (RedirectException e) {
+      handleQueryRedirection(e.getEndPoint());
+      if (enableQueryRedirection) {
+        // retry
+        try {
+          return defaultSessionConnection.executeAggregationQuery(paths, aggregations);
+        } catch (RedirectException redirectException) {
+          logger.error("redirect twice", redirectException);
+          throw new StatementExecutionException("redirect twice, please try again.");
+        }
+      } else {
+        throw new StatementExecutionException(MSG_DONOT_ENABLE_REDIRECT);
+      }
+    }
   }
 
   @Override
   public SessionDataSet executeAggregationQuery(
-      List<String> paths, List<Aggregation> aggregations, long startTime, long endTime) {
-    return null;
+      List<String> paths, List<Aggregation> aggregations, long startTime, long endTime)
+      throws StatementExecutionException, IoTDBConnectionException {
+    try {
+      return defaultSessionConnection.executeAggregationQuery(
+          paths, aggregations, startTime, endTime);
+    } catch (RedirectException e) {
+      handleQueryRedirection(e.getEndPoint());
+      if (enableQueryRedirection) {
+        // retry
+        try {
+          return defaultSessionConnection.executeAggregationQuery(
+              paths, aggregations, startTime, endTime);
+        } catch (RedirectException redirectException) {
+          logger.error("redirect twice", redirectException);
+          throw new StatementExecutionException("redirect twice, please try again.");
+        }
+      } else {
+        throw new StatementExecutionException(MSG_DONOT_ENABLE_REDIRECT);
+      }
+    }
   }
 
   @Override
@@ -856,8 +889,26 @@ public class Session implements ISession {
       List<Aggregation> aggregations,
       long startTime,
       long endTime,
-      long interval) {
-    return null;
+      long interval)
+      throws StatementExecutionException, IoTDBConnectionException {
+    try {
+      return defaultSessionConnection.executeAggregationQuery(
+          paths, aggregations, startTime, endTime, interval);
+    } catch (RedirectException e) {
+      handleQueryRedirection(e.getEndPoint());
+      if (enableQueryRedirection) {
+        // retry
+        try {
+          return defaultSessionConnection.executeAggregationQuery(
+              paths, aggregations, startTime, endTime, interval);
+        } catch (RedirectException redirectException) {
+          logger.error("redirect twice", redirectException);
+          throw new StatementExecutionException("redirect twice, please try again.");
+        }
+      } else {
+        throw new StatementExecutionException(MSG_DONOT_ENABLE_REDIRECT);
+      }
+    }
   }
 
   @Override
@@ -867,8 +918,26 @@ public class Session implements ISession {
       long startTime,
       long endTime,
       long interval,
-      long slidingStep) {
-    return null;
+      long slidingStep)
+      throws StatementExecutionException, IoTDBConnectionException {
+    try {
+      return defaultSessionConnection.executeAggregationQuery(
+          paths, aggregations, startTime, endTime, interval, slidingStep);
+    } catch (RedirectException e) {
+      handleQueryRedirection(e.getEndPoint());
+      if (enableQueryRedirection) {
+        // retry
+        try {
+          return defaultSessionConnection.executeAggregationQuery(
+              paths, aggregations, startTime, endTime, interval, slidingStep);
+        } catch (RedirectException redirectException) {
+          logger.error("redirect twice", redirectException);
+          throw new StatementExecutionException("redirect twice, please try again.");
+        }
+      } else {
+        throw new StatementExecutionException(MSG_DONOT_ENABLE_REDIRECT);
+      }
+    }
   }
 
   /**
diff --git a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index 8f6d9306f2..a2282ad80c 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -20,15 +20,18 @@
 package org.apache.iotdb.session;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TPartialPath;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.isession.SessionConfig;
 import org.apache.iotdb.isession.SessionDataSet;
+import org.apache.iotdb.isession.util.Aggregation;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.rpc.RedirectException;
 import org.apache.iotdb.rpc.RpcTransportFactory;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.StatementExecutionException;
 import org.apache.iotdb.service.rpc.thrift.IClientRPCService;
+import org.apache.iotdb.service.rpc.thrift.TSAggregationQueryReq;
 import org.apache.iotdb.service.rpc.thrift.TSAppendSchemaTemplateReq;
 import org.apache.iotdb.service.rpc.thrift.TSBackupConfigurationResp;
 import org.apache.iotdb.service.rpc.thrift.TSCloseSessionReq;
@@ -60,6 +63,7 @@ import org.apache.iotdb.service.rpc.thrift.TSSetSchemaTemplateReq;
 import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneReq;
 import org.apache.iotdb.service.rpc.thrift.TSUnsetSchemaTemplateReq;
 import org.apache.iotdb.session.util.SessionUtils;
+import org.apache.iotdb.tsfile.read.common.parser.PathNodesGenerator;
 
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TBinaryProtocol;
@@ -71,9 +75,11 @@ import org.slf4j.LoggerFactory;
 
 import java.time.ZoneId;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Random;
 import java.util.StringJoiner;
+import java.util.stream.Collectors;
 
 public class SessionConnection {
 
@@ -487,6 +493,109 @@ public class SessionConnection {
         tsExecuteStatementResp.moreData);
   }
 
+  protected SessionDataSet executeAggregationQuery(
+      List<String> paths, List<Aggregation> aggregations)
+      throws StatementExecutionException, IoTDBConnectionException, RedirectException {
+    TSAggregationQueryReq req = createAggregationQueryReq(paths, aggregations);
+    return executeAggregationQuery(req);
+  }
+
+  protected SessionDataSet executeAggregationQuery(
+      List<String> paths, List<Aggregation> aggregations, long startTime, long endTime)
+      throws StatementExecutionException, IoTDBConnectionException, RedirectException {
+    TSAggregationQueryReq req = createAggregationQueryReq(paths, aggregations);
+    req.setStartTime(startTime);
+    req.setEndTime(endTime);
+    return executeAggregationQuery(req);
+  }
+
+  protected SessionDataSet executeAggregationQuery(
+      List<String> paths,
+      List<Aggregation> aggregations,
+      long startTime,
+      long endTime,
+      long interval)
+      throws StatementExecutionException, IoTDBConnectionException, RedirectException {
+    TSAggregationQueryReq req = createAggregationQueryReq(paths, aggregations);
+    req.setStartTime(startTime);
+    req.setEndTime(endTime);
+    req.setInterval(interval);
+    return executeAggregationQuery(req);
+  }
+
+  protected SessionDataSet executeAggregationQuery(
+      List<String> paths,
+      List<Aggregation> aggregations,
+      long startTime,
+      long endTime,
+      long interval,
+      long slidingStep)
+      throws StatementExecutionException, IoTDBConnectionException, RedirectException {
+    TSAggregationQueryReq req = createAggregationQueryReq(paths, aggregations);
+    req.setStartTime(startTime);
+    req.setEndTime(endTime);
+    req.setInterval(interval);
+    req.setSlidingStep(slidingStep);
+    return executeAggregationQuery(req);
+  }
+
+  private SessionDataSet executeAggregationQuery(TSAggregationQueryReq tsAggregationQueryReq)
+      throws StatementExecutionException, IoTDBConnectionException, RedirectException {
+    TSExecuteStatementResp tsExecuteStatementResp;
+    try {
+      tsExecuteStatementResp = client.executeAggregationQuery(tsAggregationQueryReq);
+      RpcUtils.verifySuccessWithRedirection(tsExecuteStatementResp.getStatus());
+    } catch (TException e) {
+      if (reconnect()) {
+        try {
+          tsAggregationQueryReq.setSessionId(sessionId);
+          tsAggregationQueryReq.setStatementId(statementId);
+          tsExecuteStatementResp = client.executeAggregationQuery(tsAggregationQueryReq);
+        } catch (TException tException) {
+          throw new IoTDBConnectionException(tException);
+        }
+      } else {
+        throw new IoTDBConnectionException(logForReconnectionFailure());
+      }
+    }
+
+    RpcUtils.verifySuccess(tsExecuteStatementResp.getStatus());
+    return new SessionDataSet(
+        "",
+        tsExecuteStatementResp.getColumns(),
+        tsExecuteStatementResp.getDataTypeList(),
+        tsExecuteStatementResp.columnNameIndexMap,
+        tsExecuteStatementResp.getQueryId(),
+        statementId,
+        client,
+        sessionId,
+        tsExecuteStatementResp.queryResult,
+        tsExecuteStatementResp.isIgnoreTimeStamp(),
+        tsExecuteStatementResp.moreData);
+  }
+
+  private List<TPartialPath> convertToPartialPaths(List<String> paths) {
+    List<TPartialPath> selectPaths = new ArrayList<>();
+    for (String pathStr : paths) {
+      selectPaths.add(
+          new TPartialPath(Arrays.asList(PathNodesGenerator.splitPathToNodes(pathStr))));
+    }
+    return selectPaths;
+  }
+
+  private TSAggregationQueryReq createAggregationQueryReq(
+      List<String> paths, List<Aggregation> aggregations) {
+    TSAggregationQueryReq req =
+        new TSAggregationQueryReq(
+            sessionId,
+            statementId,
+            convertToPartialPaths(paths),
+            aggregations.stream().map(Enum::toString).collect(Collectors.toList()));
+    req.setFetchSize(session.getFetchSize());
+    req.setTimeout(session.getQueryTimeout());
+    return req;
+  }
+
   protected void insertRecord(TSInsertRecordReq request)
       throws IoTDBConnectionException, StatementExecutionException, RedirectException {
     request.setSessionId(sessionId);
diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index 89590cb81f..be024bae5e 100644
--- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -2556,35 +2556,111 @@ public class SessionPool implements ISessionPool {
   }
 
   @Override
-  public SessionDataSet executeAggregationQuery(
-      List<String> paths, List<Aggregation> aggregations) {
+  public SessionDataSetWrapper executeAggregationQuery(
+      List<String> paths, List<Aggregation> aggregations)
+      throws StatementExecutionException, IoTDBConnectionException {
+    for (int i = 0; i < RETRY; i++) {
+      ISession session = getSession();
+      try {
+        SessionDataSet resp = session.executeAggregationQuery(paths, aggregations);
+        SessionDataSetWrapper wrapper = new SessionDataSetWrapper(resp, session, this);
+        occupy(session);
+        return wrapper;
+      } catch (IoTDBConnectionException e) {
+        // TException means the connection is broken, remove it and get a new one.
+        logger.warn("executeAggregationQuery failed", e);
+        cleanSessionAndMayThrowConnectionException(session, i, e);
+      } catch (StatementExecutionException | RuntimeException e) {
+        putBack(session);
+        throw e;
+      }
+    }
+    // never go here
     return null;
   }
 
   @Override
-  public SessionDataSet executeAggregationQuery(
-      List<String> paths, List<Aggregation> aggregations, long startTime, long endTime) {
+  public SessionDataSetWrapper executeAggregationQuery(
+      List<String> paths, List<Aggregation> aggregations, long startTime, long endTime)
+      throws StatementExecutionException, IoTDBConnectionException {
+    for (int i = 0; i < RETRY; i++) {
+      ISession session = getSession();
+      try {
+        SessionDataSet resp =
+            session.executeAggregationQuery(paths, aggregations, startTime, endTime);
+        SessionDataSetWrapper wrapper = new SessionDataSetWrapper(resp, session, this);
+        occupy(session);
+        return wrapper;
+      } catch (IoTDBConnectionException e) {
+        // TException means the connection is broken, remove it and get a new one.
+        logger.warn("executeAggregationQuery failed", e);
+        cleanSessionAndMayThrowConnectionException(session, i, e);
+      } catch (StatementExecutionException | RuntimeException e) {
+        putBack(session);
+        throw e;
+      }
+    }
+    // never go here
     return null;
   }
 
   @Override
-  public SessionDataSet executeAggregationQuery(
+  public SessionDataSetWrapper executeAggregationQuery(
       List<String> paths,
       List<Aggregation> aggregations,
       long startTime,
       long endTime,
-      long interval) {
+      long interval)
+      throws StatementExecutionException, IoTDBConnectionException {
+    for (int i = 0; i < RETRY; i++) {
+      ISession session = getSession();
+      try {
+        SessionDataSet resp =
+            session.executeAggregationQuery(paths, aggregations, startTime, endTime, interval);
+        SessionDataSetWrapper wrapper = new SessionDataSetWrapper(resp, session, this);
+        occupy(session);
+        return wrapper;
+      } catch (IoTDBConnectionException e) {
+        // TException means the connection is broken, remove it and get a new one.
+        logger.warn("executeAggregationQuery failed", e);
+        cleanSessionAndMayThrowConnectionException(session, i, e);
+      } catch (StatementExecutionException | RuntimeException e) {
+        putBack(session);
+        throw e;
+      }
+    }
+    // never go here
     return null;
   }
 
   @Override
-  public SessionDataSet executeAggregationQuery(
+  public SessionDataSetWrapper executeAggregationQuery(
       List<String> paths,
       List<Aggregation> aggregations,
       long startTime,
       long endTime,
       long interval,
-      long slidingStep) {
+      long slidingStep)
+      throws StatementExecutionException, IoTDBConnectionException {
+    for (int i = 0; i < RETRY; i++) {
+      ISession session = getSession();
+      try {
+        SessionDataSet resp =
+            session.executeAggregationQuery(
+                paths, aggregations, startTime, endTime, interval, slidingStep);
+        SessionDataSetWrapper wrapper = new SessionDataSetWrapper(resp, session, this);
+        occupy(session);
+        return wrapper;
+      } catch (IoTDBConnectionException e) {
+        // TException means the connection is broken, remove it and get a new one.
+        logger.warn("executeAggregationQuery failed", e);
+        cleanSessionAndMayThrowConnectionException(session, i, e);
+      } catch (StatementExecutionException | RuntimeException e) {
+        putBack(session);
+        throw e;
+      }
+    }
+    // never go here
     return null;
   }
 


[iotdb] 01/05: add interface in thrift

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/querySession
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 9d6cd39831f6d5d5dc24cc3a5d56e72bf04f3afd
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Wed Dec 28 16:27:18 2022 +0800

    add interface in thrift
---
 thrift-commons/src/main/thrift/common.thrift |  4 ++++
 thrift/src/main/thrift/client.thrift         | 23 +++++++++++++++++++----
 2 files changed, 23 insertions(+), 4 deletions(-)

diff --git a/thrift-commons/src/main/thrift/common.thrift b/thrift-commons/src/main/thrift/common.thrift
index fecceb80a8..b10f9f1945 100644
--- a/thrift-commons/src/main/thrift/common.thrift
+++ b/thrift-commons/src/main/thrift/common.thrift
@@ -123,3 +123,7 @@ struct TFilesResp {
   1: required TSStatus status
   2: required list<TFile> files
 }
+
+struct TPartialPath {
+  1: required list<string> nodes;
+}
\ No newline at end of file
diff --git a/thrift/src/main/thrift/client.thrift b/thrift/src/main/thrift/client.thrift
index ec216613ea..e8c2167efe 100644
--- a/thrift/src/main/thrift/client.thrift
+++ b/thrift/src/main/thrift/client.thrift
@@ -320,8 +320,8 @@ struct TSRawDataQueryReq {
   4: required i64 startTime
   5: required i64 endTime
   6: required i64 statementId
-  7: optional bool enableRedirectQuery;
-  8: optional bool jdbcQuery;
+  7: optional bool enableRedirectQuery
+  8: optional bool jdbcQuery
   9: optional i64 timeout
 }
 
@@ -331,11 +331,24 @@ struct TSLastDataQueryReq {
   3: optional i32 fetchSize
   4: required i64 time
   5: required i64 statementId
-  6: optional bool enableRedirectQuery;
-  7: optional bool jdbcQuery;
+  6: optional bool enableRedirectQuery
+  7: optional bool jdbcQuery
   8: optional i64 timeout
 }
 
+struct TSAggregationQueryReq {
+  1: required i64 sessionId
+  2: required i64 statementId
+  3: required list<common.TPartialPath> paths
+  4: required list<string> aggregations
+  5: optional i64 startTime
+  6: optional i64 endTime
+  7: optional i64 interval
+  8: optional i64 slidingStep
+  9: optional i32 fetchSize
+  10: optional i64 timeout
+}
+
 struct TSCreateMultiTimeseriesReq {
   1: required i64 sessionId
   2: required list<string> paths
@@ -469,6 +482,8 @@ service IClientRPCService {
 
   TSExecuteStatementResp executeLastDataQueryV2(1:TSLastDataQueryReq req);
 
+  TSExecuteStatementResp executeAggregationQuery(1:TSAggregationQueryReq req);
+
   TSFetchResultsResp fetchResultsV2(1:TSFetchResultsReq req);
 
   TSOpenSessionResp openSession(1:TSOpenSessionReq req);


[iotdb] 05/05: add ITs

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/querySession
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 5c1626714e47b1000528a63449153db8def40c90
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Thu Dec 29 16:06:58 2022 +0800

    add ITs
---
 integration-test/import-control.xml                |   2 +
 .../db/it/aligned/IoTDBAlignedSeriesQueryIT.java   |  10 -
 .../apache/iotdb/db/it/utils/AlignedWriteUtil.java |  12 +
 .../org/apache/iotdb/db/it/utils/TestUtils.java    |  32 ++
 .../iotdb/session/it/IoTDBSessionQueryIT.java      | 382 +++++++++++++++++++++
 5 files changed, 428 insertions(+), 10 deletions(-)

diff --git a/integration-test/import-control.xml b/integration-test/import-control.xml
index 0465588691..a0e5e694f3 100644
--- a/integration-test/import-control.xml
+++ b/integration-test/import-control.xml
@@ -90,6 +90,7 @@
     <allow class="org.apache.iotdb.db.conf.IoTDBDescriptor" />
     <allow class="org.apache.iotdb.db.conf.OperationType" />
     <allow class="org.apache.iotdb.db.utils.EnvironmentUtils" />
+    <allow class="org.apache.iotdb.db.it.utils.AlignedWriteUtil" />
     <allow class="org.apache.iotdb.tsfile.common.constant.TsFileConstant" />
     <allow class="org.apache.iotdb.service.rpc.thrift.TSConnectionInfoResp" />
     <allow class="org.apache.iotdb.service.rpc.thrift.TSConnectionType" />
@@ -101,6 +102,7 @@
     <allow pkg="org\.apache\.iotdb\.tsfile\.file\.metadata.*" regex="true" />
     <allow pkg="org.apache.iotdb.db.metadata.idtable.trigger_example" />
     <allow pkg="org.apache.iotdb.session.template" />
+    <allow pkg="org\.apache\.iotdb\.db\.it\.utils\.TestUtils.*" regex="true" />
   </subpackage>
   <subpackage name="zeppelin.it">
     <allow class="org.apache.zeppelin.interpreter.InterpreterResult" />
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/aligned/IoTDBAlignedSeriesQueryIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/aligned/IoTDBAlignedSeriesQueryIT.java
index c24e03603b..e651a085ab 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/aligned/IoTDBAlignedSeriesQueryIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/aligned/IoTDBAlignedSeriesQueryIT.java
@@ -2199,7 +2199,6 @@ public class IoTDBAlignedSeriesQueryIT {
     }
   }
 
-  // Remove after supporting value filter
   @Test
   public void selectAllAlignedWithValueFilterAlignByDeviceTest1() {
     String[] retArray =
@@ -2250,7 +2249,6 @@ public class IoTDBAlignedSeriesQueryIT {
     }
   }
 
-  // Remove after supporting value filter
   @Test
   public void selectAllAlignedWithValueFilterAlignByDeviceTest2() {
     String[] retArray =
@@ -2299,7 +2297,6 @@ public class IoTDBAlignedSeriesQueryIT {
     }
   }
 
-  // Remove after supporting value filter
   @Test
   public void selectAllAlignedWithTimeAndValueFilterAlignByDeviceTest1() {
     String[] retArray =
@@ -2348,7 +2345,6 @@ public class IoTDBAlignedSeriesQueryIT {
     }
   }
 
-  // Remove after supporting value filter
   @Test
   public void selectSomeAlignedWithValueFilterAlignByDeviceTest1() {
     String[] retArray =
@@ -2403,7 +2399,6 @@ public class IoTDBAlignedSeriesQueryIT {
     }
   }
 
-  // Remove after supporting value filter
   @Test
   public void selectSomeAlignedWithValueFilterAlignByDeviceTest2() {
     String[] retArray =
@@ -2610,7 +2605,6 @@ public class IoTDBAlignedSeriesQueryIT {
     }
   }
 
-  // Remove after supporting value filter
   @Test
   public void countAlignedWithValueFilterAlignByDeviceTest() {
     String[] retArray = new String[] {"root.sg1.d1", "11"};
@@ -2648,7 +2642,6 @@ public class IoTDBAlignedSeriesQueryIT {
     }
   }
 
-  // Remove after supporting value filter
   @Test
   public void aggregationFuncAlignedWithValueFilterAlignByDeviceTest() {
     String[] retArray =
@@ -2702,7 +2695,6 @@ public class IoTDBAlignedSeriesQueryIT {
     }
   }
 
-  // Remove after supporting value filter
   @Test
   public void countAllAlignedWithValueFilterAlignByDeviceTest() {
     String[] retArray = new String[] {"root.sg1.d1", "6", "6", "9", "11", "6"};
@@ -2742,7 +2734,6 @@ public class IoTDBAlignedSeriesQueryIT {
     }
   }
 
-  // Remove after supporting value filter
   @Test
   public void aggregationAllAlignedWithValueFilterAlignByDeviceTest() {
     String[] retArray = new String[] {"root.sg1.d1", "160016.0", "11", "1", "13"};
@@ -3027,7 +3018,6 @@ public class IoTDBAlignedSeriesQueryIT {
     }
   }
 
-  // Remove after supporting value filter
   @Test
   public void countSumAvgValueFillAlignByDeviceTest() throws SQLException {
     String[] retArray =
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/utils/AlignedWriteUtil.java b/integration-test/src/test/java/org/apache/iotdb/db/it/utils/AlignedWriteUtil.java
index 1671ddf76e..798f1b2b88 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/utils/AlignedWriteUtil.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/utils/AlignedWriteUtil.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.it.utils;
 
+import org.apache.iotdb.isession.ISession;
 import org.apache.iotdb.it.env.EnvFactory;
 
 import java.sql.Connection;
@@ -145,4 +146,15 @@ public class AlignedWriteUtil {
       fail(e.getMessage());
     }
   }
+
+  public static void insertDataWithSession() {
+    try (ISession session = EnvFactory.getEnv().getSessionConnection()) {
+      for (String sql : sqls) {
+        session.executeNonQueryStatement(sql);
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
 }
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java b/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
index 7aa09b3c52..03fb39157e 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
@@ -19,7 +19,11 @@
 
 package org.apache.iotdb.db.it.utils;
 
+import org.apache.iotdb.isession.SessionDataSet;
 import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
 
 import org.junit.Assert;
 
@@ -31,11 +35,13 @@ import java.sql.Statement;
 import java.text.DateFormat;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
 import static org.apache.iotdb.itbase.constant.TestConstant.DELTA;
 import static org.apache.iotdb.itbase.constant.TestConstant.NULL;
+import static org.apache.iotdb.itbase.constant.TestConstant.TIMESTAMP_STR;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
@@ -268,4 +274,30 @@ public class TestUtils {
       fail(e.getMessage());
     }
   }
+
+  public static void assertResultSetEqual(
+      SessionDataSet actualResultSet,
+      List<String> expectedColumnNames,
+      String[] expectedRetArray,
+      boolean ignoreTimeStamp) {
+    try {
+      List<String> actualColumnNames = actualResultSet.getColumnNames();
+      if (ignoreTimeStamp) {
+        assertEquals(expectedColumnNames, actualColumnNames);
+      } else {
+        assertEquals(TIMESTAMP_STR, actualColumnNames.get(0));
+        assertEquals(expectedColumnNames, actualColumnNames.subList(1, actualColumnNames.size()));
+      }
+
+      int count = 0;
+      while (actualResultSet.hasNext()) {
+        RowRecord rowRecord = actualResultSet.next();
+        assertEquals(expectedRetArray[count++], rowRecord.toString().replace('\t', ','));
+      }
+      assertEquals(expectedRetArray.length, count);
+    } catch (IoTDBConnectionException | StatementExecutionException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
 }
diff --git a/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionQueryIT.java b/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionQueryIT.java
new file mode 100644
index 0000000000..20f4f4057d
--- /dev/null
+++ b/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionQueryIT.java
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.session.it;
+
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.db.it.utils.AlignedWriteUtil;
+import org.apache.iotdb.isession.ISession;
+import org.apache.iotdb.isession.SessionDataSet;
+import org.apache.iotdb.isession.util.Aggregation;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.iotdb.db.it.utils.TestUtils.assertResultSetEqual;
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({LocalStandaloneIT.class, ClusterIT.class})
+public class IoTDBSessionQueryIT {
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    System.setProperty(IoTDBConstant.IOTDB_CONF, "src/test/resources/");
+    EnvFactory.getEnv().initBeforeClass();
+    AlignedWriteUtil.insertDataWithSession();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    EnvFactory.getEnv().cleanAfterClass();
+  }
+
+  // ------------------------------ Raw Data Query ----------------------------------
+  @Test
+  public void rawDataQueryWithTimeRangeTest1() throws IoTDBConnectionException {
+    String[] retArray =
+        new String[] {
+          "16,16.0,null,null",
+          "17,17.0,null,null",
+          "18,18.0,null,null",
+          "19,19.0,null,null",
+          "20,20.0,null,null",
+          "21,null,true,null",
+          "22,null,true,null",
+          "23,230000.0,false,null",
+          "24,null,true,null",
+          "25,null,true,null",
+          "26,null,false,null",
+          "27,null,false,null",
+          "28,null,false,null",
+          "29,null,false,null",
+          "30,null,false,null",
+          "31,null,null,aligned_test31",
+          "32,null,null,aligned_test32",
+          "33,null,null,aligned_test33",
+          "34,null,null,aligned_test34",
+        };
+
+    List<String> columnNames = Arrays.asList("root.sg1.d1.s1", "root.sg1.d1.s4", "root.sg1.d1.s5");
+
+    try (ISession session = EnvFactory.getEnv().getSessionConnection()) {
+      try (SessionDataSet resultSet = session.executeRawDataQuery(columnNames, 16, 35)) {
+        assertResultSetEqual(resultSet, columnNames, retArray, false);
+      }
+    } catch (StatementExecutionException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void rawDataQueryWithTimeRangeTest2() throws IoTDBConnectionException {
+    String[] retArray =
+        new String[] {
+          "16,null,null,16.0,null,null,16.0",
+          "17,null,null,17.0,null,null,17.0",
+          "18,null,null,18.0,null,null,18.0",
+          "19,null,null,19.0,null,null,19.0",
+          "20,null,null,20.0,null,null,20.0",
+          "21,null,true,null,null,true,null",
+          "22,null,true,null,null,true,null",
+          "23,null,false,null,null,true,230000.0",
+          "24,null,true,null,null,true,null",
+          "25,null,true,null,null,true,null",
+          "26,null,false,null,null,false,null",
+          "27,null,false,null,null,false,null",
+          "28,null,false,null,null,false,null",
+          "29,null,false,null,null,false,null",
+          "30,null,false,null,null,false,null",
+          "31,non_aligned_test31,null,null,aligned_test31,null,null",
+          "32,non_aligned_test32,null,null,aligned_test32,null,null",
+          "33,non_aligned_test33,null,null,aligned_test33,null,null",
+          "34,non_aligned_test34,null,null,aligned_test34,null,null",
+        };
+
+    List<String> columnNames =
+        Arrays.asList(
+            "root.sg1.d2.s5",
+            "root.sg1.d1.s4",
+            "root.sg1.d2.s1",
+            "root.sg1.d1.s5",
+            "root.sg1.d2.s4",
+            "root.sg1.d1.s1");
+
+    try (ISession session = EnvFactory.getEnv().getSessionConnection()) {
+      try (SessionDataSet resultSet = session.executeRawDataQuery(columnNames, 16, 35)) {
+        assertResultSetEqual(resultSet, columnNames, retArray, false);
+      }
+    } catch (StatementExecutionException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  // --------------------------------- Last Query ------------------------------------
+  private final List<String> lastQueryColumnNames =
+      Arrays.asList("Time", "Timeseries", "Value", "DataType");
+
+  @Test
+  public void lastQueryTest1() throws IoTDBConnectionException {
+    String[] retArray =
+        new String[] {
+          "23,root.sg1.d1.s1,230000.0,FLOAT",
+          "40,root.sg1.d1.s2,40,INT32",
+          "30,root.sg1.d1.s3,30,INT64",
+          "30,root.sg1.d1.s4,false,BOOLEAN",
+          "40,root.sg1.d1.s5,aligned_test40,TEXT"
+        };
+
+    List<String> selectedPaths =
+        Arrays.asList(
+            "root.sg1.d1.s1",
+            "root.sg1.d1.s2",
+            "root.sg1.d1.s3",
+            "root.sg1.d1.s4",
+            "root.sg1.d1.s5");
+
+    try (ISession session = EnvFactory.getEnv().getSessionConnection()) {
+      try (SessionDataSet resultSet = session.executeLastDataQuery(selectedPaths)) {
+        assertResultSetEqual(resultSet, lastQueryColumnNames, retArray, true);
+      }
+    } catch (StatementExecutionException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void lastQueryTest2() throws IoTDBConnectionException {
+    String[] retArray =
+        new String[] {
+          "23,root.sg1.d1.s1,230000.0,FLOAT",
+          "30,root.sg1.d1.s4,false,BOOLEAN",
+          "40,root.sg1.d1.s5,aligned_test40,TEXT"
+        };
+
+    List<String> selectedPaths =
+        Arrays.asList("root.sg1.d1.s1", "root.sg1.d1.s4", "root.sg1.d1.s5");
+
+    try (ISession session = EnvFactory.getEnv().getSessionConnection()) {
+      try (SessionDataSet resultSet = session.executeLastDataQuery(selectedPaths)) {
+        assertResultSetEqual(resultSet, lastQueryColumnNames, retArray, true);
+      }
+    } catch (StatementExecutionException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void lastQueryWithLastTimeTest1() throws IoTDBConnectionException {
+    String[] retArray =
+        new String[] {"40,root.sg1.d1.s2,40,INT32", "40,root.sg1.d1.s5,aligned_test40,TEXT"};
+
+    List<String> selectedPaths = Arrays.asList("root.sg1.d1.s2", "root.sg1.d1.s5");
+
+    try (ISession session = EnvFactory.getEnv().getSessionConnection()) {
+      try (SessionDataSet resultSet = session.executeLastDataQuery(selectedPaths, 30)) {
+        assertResultSetEqual(resultSet, lastQueryColumnNames, retArray, true);
+      }
+    } catch (StatementExecutionException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void lastQueryWithLastTimeTest2() throws IoTDBConnectionException {
+    String[] retArray =
+        new String[] {
+          "40,root.sg1.d1.s5,aligned_test40,TEXT", "40,root.sg1.d2.s5,non_aligned_test40,TEXT"
+        };
+
+    List<String> selectedPaths = Arrays.asList("root.sg1.d1.s5", "root.sg1.d2.s5");
+
+    try (ISession session = EnvFactory.getEnv().getSessionConnection()) {
+      try (SessionDataSet resultSet = session.executeLastDataQuery(selectedPaths, 30)) {
+        assertResultSetEqual(resultSet, lastQueryColumnNames, retArray, true);
+      }
+    } catch (StatementExecutionException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  // ------------------------------ Aggregation Query ------------------------------
+  @Test
+  public void aggregationQueryTest() {
+    String[] retArray = new String[] {"0,20,29,28,19,20"};
+    List<String> paths =
+        Arrays.asList(
+            "root.sg1.d1.s1",
+            "root.sg1.d1.s2",
+            "root.sg1.d1.s3",
+            "root.sg1.d1.s4",
+            "root.sg1.d1.s5");
+    List<Aggregation> aggregations = Collections.nCopies(paths.size(), Aggregation.COUNT);
+
+    List<String> columnNames = new ArrayList<>();
+    for (int i = 0; i < paths.size(); i++) {
+      columnNames.add(String.format("%s(%s)", aggregations.get(i), paths.get(i)));
+    }
+
+    try (ISession session = EnvFactory.getEnv().getSessionConnection()) {
+      try (SessionDataSet resultSet = session.executeAggregationQuery(paths, aggregations)) {
+        assertResultSetEqual(resultSet, columnNames, retArray, true);
+      }
+    } catch (StatementExecutionException | IoTDBConnectionException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void aggregationQueryWithTimeRangeTest() {
+    String[] retArray = new String[] {"0,12,15,22,13,6"};
+    List<String> paths =
+        Arrays.asList(
+            "root.sg1.d1.s1",
+            "root.sg1.d1.s2",
+            "root.sg1.d1.s3",
+            "root.sg1.d1.s4",
+            "root.sg1.d1.s5");
+    List<Aggregation> aggregations = Collections.nCopies(paths.size(), Aggregation.COUNT);
+
+    List<String> columnNames = new ArrayList<>();
+    for (int i = 0; i < paths.size(); i++) {
+      columnNames.add(String.format("%s(%s)", aggregations.get(i), paths.get(i)));
+    }
+
+    try (ISession session = EnvFactory.getEnv().getSessionConnection()) {
+      try (SessionDataSet resultSet = session.executeAggregationQuery(paths, aggregations, 9, 34)) {
+        assertResultSetEqual(resultSet, columnNames, retArray, true);
+      }
+    } catch (StatementExecutionException | IoTDBConnectionException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  // ---------------------------- Group By Aggregation Query -------------------------
+  @Test
+  public void groupByQueryTest1() {
+    String[] retArray =
+        new String[] {"11,10,130142.0,13014.2", "21,1,null,230000.0", "31,0,355.0,null"};
+    List<String> paths = Arrays.asList("root.sg1.d1.s1", "root.sg1.d1.s2", "root.sg1.d1.s1");
+    List<Aggregation> aggregations =
+        Arrays.asList(Aggregation.COUNT, Aggregation.SUM, Aggregation.AVG);
+
+    List<String> columnNames = new ArrayList<>();
+    columnNames.add("Time");
+    for (int i = 0; i < paths.size(); i++) {
+      columnNames.add(String.format("%s(%s)", aggregations.get(i), paths.get(i)));
+    }
+
+    try (ISession session = EnvFactory.getEnv().getSessionConnection()) {
+      try (SessionDataSet resultSet =
+          session.executeAggregationQuery(paths, aggregations, 11, 41, 10)) {
+        assertResultSetEqual(resultSet, columnNames, retArray, true);
+      }
+    } catch (StatementExecutionException | IoTDBConnectionException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void groupByQueryTest2() {
+    String[] retArray =
+        new String[] {
+          "7,3,34.0,8.0",
+          "13,4,130045.0,32511.25",
+          "19,2,39.0,19.5",
+          "25,0,null,null",
+          "31,0,130.0,null",
+          "37,0,154.0,null"
+        };
+    List<String> paths = Arrays.asList("root.sg1.d1.s1", "root.sg1.d1.s2", "root.sg1.d1.s1");
+    List<Aggregation> aggregations =
+        Arrays.asList(Aggregation.COUNT, Aggregation.SUM, Aggregation.AVG);
+
+    List<String> columnNames = new ArrayList<>();
+    columnNames.add("Time");
+    for (int i = 0; i < paths.size(); i++) {
+      columnNames.add(String.format("%s(%s)", aggregations.get(i), paths.get(i)));
+    }
+
+    try (ISession session = EnvFactory.getEnv().getSessionConnection()) {
+      try (SessionDataSet resultSet =
+          session.executeAggregationQuery(paths, aggregations, 7, 41, 4, 6)) {
+        assertResultSetEqual(resultSet, columnNames, retArray, true);
+      }
+    } catch (StatementExecutionException | IoTDBConnectionException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void groupByQueryTest3() {
+    String[] retArray =
+        new String[] {
+          "6,9,130092.0,14453.555555555555",
+          "11,10,130142.0,13014.2",
+          "16,6,90.0,38348.333333333336",
+          "21,1,null,230000.0",
+          "26,0,165.0,null",
+          "31,0,355.0,null",
+          "36,0,190.0,null"
+        };
+    List<String> paths = Arrays.asList("root.sg1.d1.s1", "root.sg1.d1.s2", "root.sg1.d1.s1");
+    List<Aggregation> aggregations =
+        Arrays.asList(Aggregation.COUNT, Aggregation.SUM, Aggregation.AVG);
+
+    List<String> columnNames = new ArrayList<>();
+    columnNames.add("Time");
+    for (int i = 0; i < paths.size(); i++) {
+      columnNames.add(String.format("%s(%s)", aggregations.get(i), paths.get(i)));
+    }
+
+    try (ISession session = EnvFactory.getEnv().getSessionConnection()) {
+      try (SessionDataSet resultSet =
+          session.executeAggregationQuery(paths, aggregations, 6, 41, 10, 5)) {
+        assertResultSetEqual(resultSet, columnNames, retArray, true);
+      }
+    } catch (StatementExecutionException | IoTDBConnectionException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+}


[iotdb] 03/05: implement server

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/querySession
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit f576b91588998c6c4e8370685e4d6b017da996e4
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Thu Dec 29 16:06:31 2022 +0800

    implement server
---
 .../db/mpp/plan/parser/StatementGenerator.java     | 58 ++++++++++++++++++++
 .../statement/component/GroupByTimeComponent.java  |  2 +-
 .../service/thrift/impl/ClientRPCServiceImpl.java  | 62 ++++++++++++++++++++++
 3 files changed, 121 insertions(+), 1 deletion(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
index 5a82d4541f..bf1b78d75d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.mpp.plan.parser;
 
+import org.apache.iotdb.common.rpc.thrift.TPartialPath;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.path.PartialPath;
@@ -32,8 +33,10 @@ import org.apache.iotdb.db.mpp.plan.expression.binary.LogicAndExpression;
 import org.apache.iotdb.db.mpp.plan.expression.leaf.ConstantOperand;
 import org.apache.iotdb.db.mpp.plan.expression.leaf.TimeSeriesOperand;
 import org.apache.iotdb.db.mpp.plan.expression.leaf.TimestampOperand;
+import org.apache.iotdb.db.mpp.plan.expression.multi.FunctionExpression;
 import org.apache.iotdb.db.mpp.plan.statement.Statement;
 import org.apache.iotdb.db.mpp.plan.statement.component.FromComponent;
+import org.apache.iotdb.db.mpp.plan.statement.component.GroupByTimeComponent;
 import org.apache.iotdb.db.mpp.plan.statement.component.ResultColumn;
 import org.apache.iotdb.db.mpp.plan.statement.component.SelectComponent;
 import org.apache.iotdb.db.mpp.plan.statement.component.WhereCondition;
@@ -61,6 +64,7 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.template.UnsetSchemaTempl
 import org.apache.iotdb.db.qp.sql.IoTDBSqlParser;
 import org.apache.iotdb.db.qp.sql.SqlLexer;
 import org.apache.iotdb.db.utils.QueryDataSetUtils;
+import org.apache.iotdb.service.rpc.thrift.TSAggregationQueryReq;
 import org.apache.iotdb.service.rpc.thrift.TSCreateAlignedTimeseriesReq;
 import org.apache.iotdb.service.rpc.thrift.TSCreateMultiTimeseriesReq;
 import org.apache.iotdb.service.rpc.thrift.TSCreateSchemaTemplateReq;
@@ -97,6 +101,7 @@ import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -174,6 +179,59 @@ public class StatementGenerator {
     return lastQueryStatement;
   }
 
+  public static Statement createStatement(TSAggregationQueryReq req, ZoneId zoneId) {
+    QueryStatement queryStatement = new QueryStatement();
+
+    FromComponent fromComponent = new FromComponent();
+    fromComponent.addPrefixPath(new PartialPath("", false));
+    queryStatement.setFromComponent(fromComponent);
+
+    SelectComponent selectComponent = new SelectComponent(zoneId);
+    List<PartialPath> selectPaths = new ArrayList<>();
+    for (TPartialPath path : req.getPaths()) {
+      selectPaths.add(new PartialPath(path.getNodes().toArray(new String[0])));
+    }
+    List<String> aggregations = req.getAggregations();
+    for (int i = 0; i < aggregations.size(); i++) {
+      selectComponent.addResultColumn(
+          new ResultColumn(
+              new FunctionExpression(
+                  aggregations.get(i),
+                  new LinkedHashMap<>(),
+                  Collections.singletonList(new TimeSeriesOperand(selectPaths.get(i)))),
+              ResultColumn.ColumnType.AGGREGATION));
+    }
+    queryStatement.setSelectComponent(selectComponent);
+
+    if (req.isSetInterval()) {
+      GroupByTimeComponent groupByTimeComponent = new GroupByTimeComponent();
+      groupByTimeComponent.setStartTime(req.getStartTime());
+      groupByTimeComponent.setEndTime(req.getEndTime());
+      groupByTimeComponent.setInterval(req.getInterval());
+      if (req.isSetSlidingStep()) {
+        groupByTimeComponent.setSlidingStep(req.getSlidingStep());
+      } else {
+        groupByTimeComponent.setSlidingStep(req.getInterval());
+      }
+      queryStatement.setGroupByTimeComponent(groupByTimeComponent);
+    } else if (req.isSetStartTime()) {
+      WhereCondition whereCondition = new WhereCondition();
+      GreaterEqualExpression leftPredicate =
+          new GreaterEqualExpression(
+              new TimestampOperand(),
+              new ConstantOperand(TSDataType.INT64, Long.toString(req.getStartTime())));
+      LessThanExpression rightPredicate =
+          new LessThanExpression(
+              new TimestampOperand(),
+              new ConstantOperand(TSDataType.INT64, Long.toString(req.getEndTime())));
+      LogicAndExpression predicate = new LogicAndExpression(leftPredicate, rightPredicate);
+      whereCondition.setPredicate(predicate);
+      queryStatement.setWhereCondition(whereCondition);
+    }
+
+    return queryStatement;
+  }
+
   public static Statement createStatement(TSInsertRecordReq insertRecordReq)
       throws IllegalPathException, QueryProcessException {
     // construct insert statement
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/GroupByTimeComponent.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/GroupByTimeComponent.java
index d17a544fbf..fc5d8a5fdc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/GroupByTimeComponent.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/GroupByTimeComponent.java
@@ -39,7 +39,7 @@ public class GroupByTimeComponent extends StatementNode {
   private boolean isSlidingStepByMonth = false;
 
   // if it is left close and right open interval
-  private boolean leftCRightO;
+  private boolean leftCRightO = true;
 
   public GroupByTimeComponent() {}
 
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
index 29d07a462d..4c843b1ae3 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
@@ -68,6 +68,7 @@ import org.apache.iotdb.metrics.utils.MetricLevel;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.service.rpc.thrift.ServerProperties;
+import org.apache.iotdb.service.rpc.thrift.TSAggregationQueryReq;
 import org.apache.iotdb.service.rpc.thrift.TSAppendSchemaTemplateReq;
 import org.apache.iotdb.service.rpc.thrift.TSBackupConfigurationResp;
 import org.apache.iotdb.service.rpc.thrift.TSCancelOperationReq;
@@ -388,6 +389,67 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
     return executeLastDataQueryInternal(req, SELECT_RESULT);
   }
 
+  @Override
+  public TSExecuteStatementResp executeAggregationQuery(TSAggregationQueryReq req)
+      throws TException {
+    boolean finished = false;
+    long queryId = Long.MIN_VALUE;
+    if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
+      return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
+    }
+    long startTime = System.currentTimeMillis();
+    try {
+      Statement s =
+          StatementGenerator.createStatement(req, SESSION_MANAGER.getCurrSession().getZoneId());
+      // permission check
+      TSStatus status = AuthorityChecker.checkAuthority(s, SESSION_MANAGER.getCurrSession());
+      if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        return RpcUtils.getTSExecuteStatementResp(status);
+      }
+
+      queryId = SESSION_MANAGER.requestQueryId(SESSION_MANAGER.getCurrSession(), req.statementId);
+      // create and cache dataset
+      ExecutionResult result =
+          COORDINATOR.execute(
+              s,
+              queryId,
+              SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
+              "",
+              PARTITION_FETCHER,
+              SCHEMA_FETCHER,
+              req.getTimeout());
+
+      if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        throw new RuntimeException("error code: " + result.status);
+      }
+
+      IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId);
+
+      try (SetThreadName threadName = new SetThreadName(result.queryId.getId())) {
+        TSExecuteStatementResp resp;
+        if (queryExecution.isQuery()) {
+          resp = createResponse(queryExecution.getDatasetHeader(), queryId);
+          resp.setStatus(result.status);
+          finished = SELECT_RESULT.apply(resp, queryExecution, req.fetchSize);
+          resp.setMoreData(!finished);
+        } else {
+          resp = RpcUtils.getTSExecuteStatementResp(result.status);
+        }
+        return resp;
+      }
+
+    } catch (Exception e) {
+      finished = true;
+      return RpcUtils.getTSExecuteStatementResp(
+          onQueryException(e, "\"" + req + "\". " + OperationType.EXECUTE_LAST_DATA_QUERY));
+    } finally {
+      addOperationLatency(Operation.EXECUTE_QUERY, startTime);
+      if (finished) {
+        COORDINATOR.cleanupQueryExecution(queryId);
+      }
+    }
+  }
+
   @Override
   public TSFetchResultsResp fetchResultsV2(TSFetchResultsReq req) {
     long startTime = System.currentTimeMillis();


[iotdb] 02/05: add interface in ISession & ISessionPool

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/querySession
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit a0db9be0fb37bc54a181a08b37ee589cc9acb381
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Wed Dec 28 16:38:16 2022 +0800

    add interface in ISession & ISessionPool
---
 .../java/org/apache/iotdb/isession/ISession.java   | 21 +++++++++++++
 .../apache/iotdb/isession/pool/ISessionPool.java   | 23 ++++++++++++++-
 .../apache/iotdb/isession/util/Aggregation.java    | 33 +++++++++++++++++++++
 .../java/org/apache/iotdb/session/Session.java     | 34 ++++++++++++++++++++++
 .../org/apache/iotdb/session/pool/SessionPool.java | 34 ++++++++++++++++++++++
 5 files changed, 144 insertions(+), 1 deletion(-)

diff --git a/isession/src/main/java/org/apache/iotdb/isession/ISession.java b/isession/src/main/java/org/apache/iotdb/isession/ISession.java
index 59b2f072bc..3b26c5a89e 100644
--- a/isession/src/main/java/org/apache/iotdb/isession/ISession.java
+++ b/isession/src/main/java/org/apache/iotdb/isession/ISession.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.isession;
 
 import org.apache.iotdb.isession.template.Template;
+import org.apache.iotdb.isession.util.Aggregation;
 import org.apache.iotdb.isession.util.SystemStatus;
 import org.apache.iotdb.isession.util.Version;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
@@ -157,6 +158,26 @@ public interface ISession extends AutoCloseable {
   SessionDataSet executeLastDataQuery(List<String> paths)
       throws StatementExecutionException, IoTDBConnectionException;
 
+  SessionDataSet executeAggregationQuery(List<String> paths, List<Aggregation> aggregations);
+
+  SessionDataSet executeAggregationQuery(
+      List<String> paths, List<Aggregation> aggregations, long startTime, long endTime);
+
+  SessionDataSet executeAggregationQuery(
+      List<String> paths,
+      List<Aggregation> aggregations,
+      long startTime,
+      long endTime,
+      long interval);
+
+  SessionDataSet executeAggregationQuery(
+      List<String> paths,
+      List<Aggregation> aggregations,
+      long startTime,
+      long endTime,
+      long interval,
+      long slidingStep);
+
   void insertRecord(
       String deviceId,
       long time,
diff --git a/isession/src/main/java/org/apache/iotdb/isession/pool/ISessionPool.java b/isession/src/main/java/org/apache/iotdb/isession/pool/ISessionPool.java
index 020f98eb2e..0367b72b97 100644
--- a/isession/src/main/java/org/apache/iotdb/isession/pool/ISessionPool.java
+++ b/isession/src/main/java/org/apache/iotdb/isession/pool/ISessionPool.java
@@ -18,7 +18,9 @@
  */
 package org.apache.iotdb.isession.pool;
 
+import org.apache.iotdb.isession.SessionDataSet;
 import org.apache.iotdb.isession.template.Template;
+import org.apache.iotdb.isession.util.Aggregation;
 import org.apache.iotdb.isession.util.SystemStatus;
 import org.apache.iotdb.isession.util.Version;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
@@ -407,7 +409,6 @@ public interface ISessionPool {
   void executeNonQueryStatement(String sql)
       throws StatementExecutionException, IoTDBConnectionException;
 
-  @SuppressWarnings("squid:S2095") // Suppress wrapper not closed warning
   SessionDataSetWrapper executeRawDataQuery(
       List<String> paths, long startTime, long endTime, long timeOut)
       throws IoTDBConnectionException, StatementExecutionException;
@@ -418,6 +419,26 @@ public interface ISessionPool {
   SessionDataSetWrapper executeLastDataQuery(List<String> paths)
       throws StatementExecutionException, IoTDBConnectionException;
 
+  SessionDataSet executeAggregationQuery(List<String> paths, List<Aggregation> aggregations);
+
+  SessionDataSet executeAggregationQuery(
+      List<String> paths, List<Aggregation> aggregations, long startTime, long endTime);
+
+  SessionDataSet executeAggregationQuery(
+      List<String> paths,
+      List<Aggregation> aggregations,
+      long startTime,
+      long endTime,
+      long interval);
+
+  SessionDataSet executeAggregationQuery(
+      List<String> paths,
+      List<Aggregation> aggregations,
+      long startTime,
+      long endTime,
+      long interval,
+      long slidingStep);
+
   int getMaxSize();
 
   String getHost();
diff --git a/isession/src/main/java/org/apache/iotdb/isession/util/Aggregation.java b/isession/src/main/java/org/apache/iotdb/isession/util/Aggregation.java
new file mode 100644
index 0000000000..264032cf02
--- /dev/null
+++ b/isession/src/main/java/org/apache/iotdb/isession/util/Aggregation.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.isession.util;
+
+public enum Aggregation {
+  COUNT,
+  AVG,
+  SUM,
+  FIRST_VALUE,
+  LAST_VALUE,
+  MAX_TIME,
+  MIN_TIME,
+  MAX_VALUE,
+  MIN_VALUE,
+  EXTREME;
+}
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index f41401d5ad..196b98f2ef 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.isession.ISession;
 import org.apache.iotdb.isession.SessionConfig;
 import org.apache.iotdb.isession.SessionDataSet;
 import org.apache.iotdb.isession.template.Template;
+import org.apache.iotdb.isession.util.Aggregation;
 import org.apache.iotdb.isession.util.Version;
 import org.apache.iotdb.rpc.BatchExecutionException;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
@@ -837,6 +838,39 @@ public class Session implements ISession {
     return executeLastDataQuery(paths, time, queryTimeoutInMs);
   }
 
+  @Override
+  public SessionDataSet executeAggregationQuery(
+      List<String> paths, List<Aggregation> aggregations) {
+    return null;
+  }
+
+  @Override
+  public SessionDataSet executeAggregationQuery(
+      List<String> paths, List<Aggregation> aggregations, long startTime, long endTime) {
+    return null;
+  }
+
+  @Override
+  public SessionDataSet executeAggregationQuery(
+      List<String> paths,
+      List<Aggregation> aggregations,
+      long startTime,
+      long endTime,
+      long interval) {
+    return null;
+  }
+
+  @Override
+  public SessionDataSet executeAggregationQuery(
+      List<String> paths,
+      List<Aggregation> aggregations,
+      long startTime,
+      long endTime,
+      long interval,
+      long slidingStep) {
+    return null;
+  }
+
   /**
    * insert data in one row, if you want to improve your performance, please use insertRecords
    * method or insertTablet method
diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index 661193a731..89590cb81f 100644
--- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.isession.SessionDataSet;
 import org.apache.iotdb.isession.pool.ISessionPool;
 import org.apache.iotdb.isession.pool.SessionDataSetWrapper;
 import org.apache.iotdb.isession.template.Template;
+import org.apache.iotdb.isession.util.Aggregation;
 import org.apache.iotdb.isession.util.Version;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.rpc.StatementExecutionException;
@@ -2554,6 +2555,39 @@ public class SessionPool implements ISessionPool {
     return null;
   }
 
+  @Override
+  public SessionDataSet executeAggregationQuery(
+      List<String> paths, List<Aggregation> aggregations) {
+    return null;
+  }
+
+  @Override
+  public SessionDataSet executeAggregationQuery(
+      List<String> paths, List<Aggregation> aggregations, long startTime, long endTime) {
+    return null;
+  }
+
+  @Override
+  public SessionDataSet executeAggregationQuery(
+      List<String> paths,
+      List<Aggregation> aggregations,
+      long startTime,
+      long endTime,
+      long interval) {
+    return null;
+  }
+
+  @Override
+  public SessionDataSet executeAggregationQuery(
+      List<String> paths,
+      List<Aggregation> aggregations,
+      long startTime,
+      long endTime,
+      long interval,
+      long slidingStep) {
+    return null;
+  }
+
   @Override
   public int getMaxSize() {
     return maxSize;