You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ch...@apache.org on 2021/06/03 08:40:38 UTC

[iotdb] branch master updated: add query last data interface (#3219)

This is an automated email from the ASF dual-hosted git repository.

chaow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 17f10e3  add query last data interface  (#3219)
17f10e3 is described below

commit 17f10e30db3dd2f2d76ba69596d179c1f4f873d6
Author: Hang Ji <55...@users.noreply.github.com>
AuthorDate: Thu Jun 3 16:40:10 2021 +0800

    add query last data interface  (#3219)
    
    Co-authored-by: Xiangwei Wei <34...@users.noreply.github.com>
---
 .../apache/iotdb/cluster/query/ClusterPlanner.java | 16 ++++++
 docker/src/main/Dockerfile-cluster                 |  5 +-
 docker/src/main/Dockerfile-single                  |  5 +-
 .../main/java/org/apache/iotdb/SessionExample.java | 15 +++++
 .../main/java/org/apache/iotdb/db/qp/Planner.java  | 15 +++++
 .../iotdb/db/qp/strategy/LogicalGenerator.java     | 29 ++++++++++
 .../org/apache/iotdb/db/service/TSServiceImpl.java | 32 +++++++++++
 .../java/org/apache/iotdb/db/qp/PlannerTest.java   | 16 ++++++
 .../java/org/apache/iotdb/session/Session.java     | 38 ++++++++++++
 .../apache/iotdb/session/SessionConnection.java    | 39 +++++++++++++
 .../iotdb/session/IoTDBSessionComplexIT.java       | 33 +++++++++++
 .../test/java/org/apache/iotdb/db/sql/Cases.java   | 67 ++++++++++++++++++++++
 thrift/src/main/thrift/rpc.thrift                  | 11 ++++
 13 files changed, 317 insertions(+), 4 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanner.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanner.java
index a2469ac..f83a2de 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanner.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanner.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.strategy.LogicalChecker;
 import org.apache.iotdb.db.qp.strategy.LogicalGenerator;
+import org.apache.iotdb.service.rpc.thrift.TSLastDataQueryReq;
 import org.apache.iotdb.service.rpc.thrift.TSRawDataQueryReq;
 
 import java.time.ZoneId;
@@ -60,4 +61,19 @@ public class ClusterPlanner extends Planner {
     return new ClusterPhysicalGenerator()
         .transformToPhysicalPlan(operator, rawDataQueryReq.fetchSize);
   }
+
+  @Override
+  public PhysicalPlan lastDataQueryReqToPhysicalPlan(
+      TSLastDataQueryReq lastDataQueryReq, ZoneId zoneId)
+      throws QueryProcessException, IllegalPathException {
+    // from TSLastDataQueryReq to logical operator
+    Operator operator = LogicalGenerator.generate(lastDataQueryReq, zoneId);
+    // check if there are logical errors
+    LogicalChecker.check(operator);
+    // optimize the logical operator
+    operator = logicalOptimize(operator, lastDataQueryReq.fetchSize);
+    // from logical operator to physical plan
+    return new ClusterPhysicalGenerator()
+        .transformToPhysicalPlan(operator, lastDataQueryReq.fetchSize);
+  }
 }
diff --git a/docker/src/main/Dockerfile-cluster b/docker/src/main/Dockerfile-cluster
index 277132b..abc324e 100644
--- a/docker/src/main/Dockerfile-cluster
+++ b/docker/src/main/Dockerfile-cluster
@@ -24,7 +24,7 @@ FROM openjdk:11-jre-slim
 ADD distribution/target/apache-iotdb-*-cluster-bin.zip /
 
 RUN apt update \
-  && apt install lsof procps unzip -y \
+  && apt install lsof dos2unix procps unzip -y \
   && unzip /apache-iotdb-*-bin.zip -d / \
   && rm /apache-iotdb-*-bin.zip \
   && mv /apache-iotdb-* /iotdb \
@@ -32,7 +32,8 @@ RUN apt update \
   && apt autoremove -y \
   && apt purge --auto-remove -y \
   && apt clean -y
-
+RUN dos2unix /iotdb/sbin/start-node.sh
+RUN dos2unix /iotdb/sbin/../conf/iotdb-env.sh
 EXPOSE 6667
 EXPOSE 31999
 EXPOSE 5555
diff --git a/docker/src/main/Dockerfile-single b/docker/src/main/Dockerfile-single
index c0e00a9..f283987 100644
--- a/docker/src/main/Dockerfile-single
+++ b/docker/src/main/Dockerfile-single
@@ -24,7 +24,7 @@ FROM openjdk:11-jre-slim
 ADD distribution/target/apache-iotdb-*-server-bin.zip /
 
 RUN apt update \
-  && apt install lsof procps unzip -y \
+  && apt install lsof dos2unix procps unzip -y \
   && unzip /apache-iotdb-*-bin.zip -d / \
   && rm /apache-iotdb-*-bin.zip \
   && mv /apache-iotdb-* /iotdb \
@@ -32,7 +32,8 @@ RUN apt update \
   && apt autoremove -y \
   && apt purge --auto-remove -y \
   && apt clean -y
-
+RUN dos2unix /iotdb/sbin/start-server.sh
+RUN dos2unix /iotdb/sbin/../conf/iotdb-env.sh
 EXPOSE 6667
 EXPOSE 31999
 EXPOSE 5555
diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
index d996d10..50b85d9 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
@@ -77,6 +77,7 @@ public class SessionExample {
     query();
     queryWithTimeout();
     rawDataQuery();
+    lastDataQuery();
     queryByIterator();
     deleteData();
     deleteTimeseries();
@@ -577,6 +578,20 @@ public class SessionExample {
     dataSet.closeOperationHandle();
   }
 
+  private static void lastDataQuery() throws IoTDBConnectionException, StatementExecutionException {
+    List<String> paths = new ArrayList<>();
+    paths.add(ROOT_SG1_D1_S1);
+    paths.add(ROOT_SG1_D1_S2);
+    paths.add(ROOT_SG1_D1_S3);
+    SessionDataSet sessionDataSet = session.executeLastDataQuery(paths, 3);
+    System.out.println(sessionDataSet.getColumnNames());
+    sessionDataSet.setFetchSize(1024);
+    while (sessionDataSet.hasNext()) {
+      System.out.println(sessionDataSet.next());
+    }
+    sessionDataSet.closeOperationHandle();
+  }
+
   private static void queryByIterator()
       throws IoTDBConnectionException, StatementExecutionException {
     SessionDataSet dataSet = session.executeQueryStatement("select * from root.sg1.d1");
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/Planner.java b/server/src/main/java/org/apache/iotdb/db/qp/Planner.java
index db4722a..1221fed 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/Planner.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/Planner.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.db.qp.strategy.optimizer.DnfFilterOptimizer;
 import org.apache.iotdb.db.qp.strategy.optimizer.MergeSingleFilterOptimizer;
 import org.apache.iotdb.db.qp.strategy.optimizer.RemoveNotOptimizer;
 import org.apache.iotdb.db.utils.TestOnly;
+import org.apache.iotdb.service.rpc.thrift.TSLastDataQueryReq;
 import org.apache.iotdb.service.rpc.thrift.TSRawDataQueryReq;
 
 import java.time.ZoneId;
@@ -75,6 +76,20 @@ public class Planner {
     return new PhysicalGenerator().transformToPhysicalPlan(operator, rawDataQueryReq.fetchSize);
   }
 
+  /** convert last data query to physical plan directly */
+  public PhysicalPlan lastDataQueryReqToPhysicalPlan(
+      TSLastDataQueryReq lastDataQueryReq, ZoneId zoneId)
+      throws QueryProcessException, IllegalPathException {
+    // from TSLastDataQueryReq to logical operator
+    Operator operator = LogicalGenerator.generate(lastDataQueryReq, zoneId);
+    // check if there are logical errors
+    LogicalChecker.check(operator);
+    // optimize the logical operator
+    operator = logicalOptimize(operator, lastDataQueryReq.fetchSize);
+    // from logical operator to physical plan
+    return new PhysicalGenerator().transformToPhysicalPlan(operator, lastDataQueryReq.fetchSize);
+  }
+
   /**
    * given an unoptimized logical operator tree and return a optimized result.
    *
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalGenerator.java b/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalGenerator.java
index 51a5e72..0cdfc38 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalGenerator.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.db.qp.sql.SqlBaseLexer;
 import org.apache.iotdb.db.qp.sql.SqlBaseParser;
 import org.apache.iotdb.db.query.expression.ResultColumn;
 import org.apache.iotdb.db.query.expression.unary.TimeSeriesOperand;
+import org.apache.iotdb.service.rpc.thrift.TSLastDataQueryReq;
 import org.apache.iotdb.service.rpc.thrift.TSRawDataQueryReq;
 
 import org.antlr.v4.runtime.CharStream;
@@ -123,5 +124,33 @@ public class LogicalGenerator {
     return queryOp;
   }
 
+  public static Operator generate(TSLastDataQueryReq req, ZoneId zoneId)
+      throws IllegalPathException {
+    // construct query operator and set its global time filter
+    SelectOperator selectOp = new SelectOperator(SQLConstant.TOK_SELECT, zoneId);
+    FromOperator fromOp = new FromOperator(SQLConstant.TOK_FROM);
+    QueryOperator queryOp = new QueryOperator(SQLConstant.TOK_QUERY);
+
+    selectOp.addResultColumn(new ResultColumn(new TimeSeriesOperand(new PartialPath(""))));
+    selectOp.markAsLastQuery();
+
+    for (String p : req.getPaths()) {
+      PartialPath path = new PartialPath(p);
+      fromOp.addPrefixTablePath(path);
+    }
+
+    queryOp.setSelectOperator(selectOp);
+    queryOp.setFromOperator(fromOp);
+
+    PartialPath timePath = new PartialPath(TIME);
+
+    BasicFunctionOperator basicFunctionOperator =
+        new BasicFunctionOperator(
+            SQLConstant.GREATERTHANOREQUALTO, timePath, Long.toString(req.getTime()));
+    queryOp.setFilterOperator(basicFunctionOperator);
+
+    return queryOp;
+  }
+
   private LogicalGenerator() {}
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index fc8c344..f09454e 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -117,6 +117,7 @@ import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertTabletReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertTabletsReq;
+import org.apache.iotdb.service.rpc.thrift.TSLastDataQueryReq;
 import org.apache.iotdb.service.rpc.thrift.TSOpenSessionReq;
 import org.apache.iotdb.service.rpc.thrift.TSOpenSessionResp;
 import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
@@ -741,6 +742,37 @@ public class TSServiceImpl implements TSIService.Iface {
     }
   }
 
+  @Override
+  public TSExecuteStatementResp executeLastDataQuery(TSLastDataQueryReq req) throws TException {
+    try {
+      if (!checkLogin(req.getSessionId())) {
+        return RpcUtils.getTSExecuteStatementResp(TSStatusCode.NOT_LOGIN_ERROR);
+      }
+
+      PhysicalPlan physicalPlan =
+          processor.lastDataQueryReqToPhysicalPlan(req, sessionIdZoneIdMap.get(req.getSessionId()));
+      return physicalPlan.isQuery()
+          ? internalExecuteQueryStatement(
+              "",
+              req.statementId,
+              physicalPlan,
+              req.fetchSize,
+              config.getQueryTimeoutThreshold(),
+              sessionIdUsernameMap.get(req.getSessionId()),
+              req.isEnableRedirectQuery())
+          : RpcUtils.getTSExecuteStatementResp(
+              TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is not a query statement.");
+    } catch (InterruptedException e) {
+      LOGGER.error(INFO_INTERRUPT_ERROR, req, e);
+      Thread.currentThread().interrupt();
+      return RpcUtils.getTSExecuteStatementResp(
+          onQueryException(e, "executing lastDataQueryReqToPhysicalPlan"));
+    } catch (Exception e) {
+      return RpcUtils.getTSExecuteStatementResp(
+          onQueryException(e, "executing lastDataQueryReqToPhysicalPlan"));
+    }
+  }
+
   /**
    * @param plan must be a plan for Query: FillQueryPlan, AggregationPlan, GroupByTimePlan, UDFPlan,
    *     some AuthorPlan
diff --git a/server/src/test/java/org/apache/iotdb/db/qp/PlannerTest.java b/server/src/test/java/org/apache/iotdb/db/qp/PlannerTest.java
index 508ebe4..e1e5386 100644
--- a/server/src/test/java/org/apache/iotdb/db/qp/PlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/qp/PlannerTest.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.service.rpc.thrift.TSLastDataQueryReq;
 import org.apache.iotdb.service.rpc.thrift.TSRawDataQueryReq;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
@@ -253,4 +254,19 @@ public class PlannerTest {
     assertEquals(paths.get(0), physicalPlan.getPaths().get(0).getFullPath());
     assertEquals(paths.get(1), physicalPlan.getPaths().get(1).getFullPath());
   }
+
+  @Test
+  public void lastDataQueryReqToPhysicalPlanTest()
+      throws QueryProcessException, IllegalPathException {
+    TSLastDataQueryReq tsLastDataQueryReq = new TSLastDataQueryReq();
+    List<String> paths = new ArrayList<>();
+    paths.add("root.vehicle.device1.sensor1");
+    tsLastDataQueryReq.setPaths(paths);
+    tsLastDataQueryReq.setTime(0);
+    tsLastDataQueryReq.setFetchSize(1000);
+    PhysicalPlan physicalPlan =
+        processor.lastDataQueryReqToPhysicalPlan(tsLastDataQueryReq, ZoneId.of("Asia/Shanghai"));
+    assertEquals(OperatorType.LAST, physicalPlan.getOperatorType());
+    assertEquals(paths.get(0), physicalPlan.getPaths().get(0).getFullPath());
+  }
 }
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index 6fc4ab4..f9e4c9c 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -590,6 +590,44 @@ public class Session {
   }
 
   /**
+   * only: select last status from root.ln.d1.s1 where time >= 1621326244168;
+   *
+   * @param paths timeSeries eg. root.ln.d1.s1,root.ln.d1.s2
+   * @param LastTime get the last data, whose timestamp greater than or equal LastTime eg.
+   *     1621326244168
+   */
+  public SessionDataSet executeLastDataQuery(List<String> paths, long LastTime)
+      throws StatementExecutionException, IoTDBConnectionException {
+    try {
+      return defaultSessionConnection.executeLastDataQuery(paths, LastTime);
+    } catch (RedirectException e) {
+      handleQueryRedirection(e.getEndPoint());
+      if (enableQueryRedirection) {
+        // retry
+        try {
+          return defaultSessionConnection.executeLastDataQuery(paths, LastTime);
+        } catch (RedirectException redirectException) {
+          logger.error("redirect twice", redirectException);
+          throw new StatementExecutionException("redirect twice, please try again.");
+        }
+      } else {
+        throw new StatementExecutionException(MSG_DONOT_ENABLE_REDIRECT);
+      }
+    }
+  }
+
+  /**
+   * query eg. select last status from root.ln.wf01.wt01; <PrefixPath> + <suffixPath> = <TimeSeries>
+   *
+   * @param paths timeSeries. eg.root.ln.d1.s1,root.ln.d1.s2
+   */
+  public SessionDataSet executeLastDataQuery(List<String> paths)
+      throws StatementExecutionException, IoTDBConnectionException {
+    long time = 0L;
+    return executeLastDataQuery(paths, time);
+  }
+
+  /**
    * insert data in one row, if you want to improve your performance, please use insertRecords
    * method or insertTablet method
    *
diff --git a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index 402523d..361c87b 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -42,6 +42,7 @@ import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertTabletReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertTabletsReq;
+import org.apache.iotdb.service.rpc.thrift.TSLastDataQueryReq;
 import org.apache.iotdb.service.rpc.thrift.TSOpenSessionReq;
 import org.apache.iotdb.service.rpc.thrift.TSOpenSessionResp;
 import org.apache.iotdb.service.rpc.thrift.TSRawDataQueryReq;
@@ -394,6 +395,44 @@ public class SessionConnection {
         execResp.isIgnoreTimeStamp());
   }
 
+  protected SessionDataSet executeLastDataQuery(List<String> paths, long time)
+      throws StatementExecutionException, IoTDBConnectionException, RedirectException {
+    TSLastDataQueryReq tsLastDataQueryReq =
+        new TSLastDataQueryReq(sessionId, paths, time, statementId);
+    tsLastDataQueryReq.setFetchSize(session.fetchSize);
+    tsLastDataQueryReq.setEnableRedirectQuery(enableRedirect);
+    TSExecuteStatementResp tsExecuteStatementResp;
+    try {
+      tsExecuteStatementResp = client.executeLastDataQuery(tsLastDataQueryReq);
+      RpcUtils.verifySuccessWithRedirection(tsExecuteStatementResp.getStatus());
+    } catch (TException e) {
+      if (reconnect()) {
+        try {
+          tsLastDataQueryReq.setSessionId(sessionId);
+          tsLastDataQueryReq.setStatementId(statementId);
+          tsExecuteStatementResp = client.executeLastDataQuery(tsLastDataQueryReq);
+        } catch (TException tException) {
+          throw new IoTDBConnectionException(tException);
+        }
+      } else {
+        throw new IoTDBConnectionException(MSG_RECONNECTION_FAIL);
+      }
+    }
+
+    RpcUtils.verifySuccess(tsExecuteStatementResp.getStatus());
+    return new SessionDataSet(
+        "",
+        tsExecuteStatementResp.getColumns(),
+        tsExecuteStatementResp.getDataTypeList(),
+        tsExecuteStatementResp.columnNameIndexMap,
+        tsExecuteStatementResp.getQueryId(),
+        statementId,
+        client,
+        sessionId,
+        tsExecuteStatementResp.queryDataSet,
+        tsExecuteStatementResp.isIgnoreTimeStamp());
+  }
+
   protected void insertRecord(TSInsertRecordReq request)
       throws IoTDBConnectionException, StatementExecutionException, RedirectException {
     request.setSessionId(sessionId);
diff --git a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionComplexIT.java b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionComplexIT.java
index 281c669..6b1234c 100644
--- a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionComplexIT.java
+++ b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionComplexIT.java
@@ -292,6 +292,20 @@ public class IoTDBSessionComplexIT {
   }
 
   @Test
+  public void testLastDataQuery() throws IoTDBConnectionException, StatementExecutionException {
+    session = new Session("127.0.0.1", 6667, "root", "root");
+    session.open();
+
+    session.setStorageGroup("root.sg1");
+
+    createTimeseries();
+
+    insertRecords();
+
+    lastDataQuery();
+  }
+
+  @Test
   public void test()
       throws ClassNotFoundException, SQLException, IoTDBConnectionException,
           StatementExecutionException {
@@ -594,6 +608,25 @@ public class IoTDBSessionComplexIT {
     sessionDataSet.closeOperationHandle();
   }
 
+  private void lastDataQuery() throws StatementExecutionException, IoTDBConnectionException {
+    List<String> paths = new ArrayList<>();
+
+    paths.add("root.sg1.d1.s1");
+    paths.add("root.sg1.d2.s1");
+
+    SessionDataSet sessionDataSet = session.executeLastDataQuery(paths);
+    sessionDataSet.setFetchSize(1024);
+
+    int count = 0;
+    while (sessionDataSet.hasNext()) {
+      count++;
+      List<Field> fields = sessionDataSet.next().getFields();
+      Assert.assertEquals("[root.sg1.d2.s1, 1]", fields.toString());
+    }
+    Assert.assertEquals(1, count);
+    sessionDataSet.closeOperationHandle();
+  }
+
   private void insertTablet(String deviceId)
       throws IoTDBConnectionException, StatementExecutionException {
 
diff --git a/testcontainer/src/test/java/org/apache/iotdb/db/sql/Cases.java b/testcontainer/src/test/java/org/apache/iotdb/db/sql/Cases.java
index 17e4f53..67d355c 100644
--- a/testcontainer/src/test/java/org/apache/iotdb/db/sql/Cases.java
+++ b/testcontainer/src/test/java/org/apache/iotdb/db/sql/Cases.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.session.SessionDataSet;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.common.Field;
 import org.apache.iotdb.tsfile.read.common.RowRecord;
 import org.apache.iotdb.tsfile.write.record.Tablet;
 import org.apache.iotdb.tsfile.write.schema.VectorMeasurementSchema;
@@ -245,4 +246,70 @@ public abstract class Cases {
     Assert.assertEquals(10, next.getFields().get(0).getLongV());
     Assert.assertEquals(10, next.getFields().get(1).getLongV());
   }
+
+  @Test
+  public void clusterLastQueryTest() throws IoTDBConnectionException, StatementExecutionException {
+
+    session.setStorageGroup("root.sg1");
+    session.createTimeseries(
+        "root.sg1.d1.s1", TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY);
+    session.createTimeseries(
+        "root.sg1.d2.s1", TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY);
+
+    insertRecords();
+
+    List<String> paths = new ArrayList<>();
+
+    paths.add("root.sg1.d1.s1");
+    paths.add("root.sg1.d2.s1");
+
+    SessionDataSet sessionDataSet = session.executeLastDataQuery(paths);
+    sessionDataSet.setFetchSize(1024);
+
+    int count = 0;
+    while (sessionDataSet.hasNext()) {
+      count++;
+      List<Field> fields = sessionDataSet.next().getFields();
+      Assert.assertEquals("[root.sg1.d1.s1, 1]", fields.toString());
+    }
+    Assert.assertEquals(1, count);
+    sessionDataSet.closeOperationHandle();
+  }
+
+  private void insertRecords() throws IoTDBConnectionException, StatementExecutionException {
+    String deviceId = "root.sg1.d1";
+    List<String> measurements = new ArrayList<>();
+    measurements.add("s1");
+    List<String> deviceIds = new ArrayList<>();
+    List<List<String>> measurementsList = new ArrayList<>();
+    List<List<Object>> valuesList = new ArrayList<>();
+    List<Long> timestamps = new ArrayList<>();
+    List<List<TSDataType>> typesList = new ArrayList<>();
+
+    for (long time = 0; time < 500; time++) {
+      List<Object> values = new ArrayList<>();
+      List<TSDataType> types = new ArrayList<>();
+      values.add(1L);
+      values.add(2L);
+      values.add(3L);
+      types.add(TSDataType.INT64);
+      types.add(TSDataType.INT64);
+      types.add(TSDataType.INT64);
+
+      deviceIds.add(deviceId);
+      measurementsList.add(measurements);
+      valuesList.add(values);
+      typesList.add(types);
+      timestamps.add(time);
+      if (time != 0 && time % 100 == 0) {
+        session.insertRecords(deviceIds, timestamps, measurementsList, typesList, valuesList);
+        deviceIds.clear();
+        measurementsList.clear();
+        valuesList.clear();
+        timestamps.clear();
+      }
+    }
+
+    session.insertRecords(deviceIds, timestamps, measurementsList, typesList, valuesList);
+  }
 }
diff --git a/thrift/src/main/thrift/rpc.thrift b/thrift/src/main/thrift/rpc.thrift
index 0b64889..4010e50 100644
--- a/thrift/src/main/thrift/rpc.thrift
+++ b/thrift/src/main/thrift/rpc.thrift
@@ -291,6 +291,15 @@ struct TSRawDataQueryReq {
   7: optional bool enableRedirectQuery;
 }
 
+struct TSLastDataQueryReq {
+  1: required i64 sessionId
+  2: required list<string> paths
+  3: optional i32 fetchSize
+  4: required i64 time
+  5: required i64 statementId
+  6: optional bool enableRedirectQuery;
+}
+
 struct TSCreateMultiTimeseriesReq {
   1: required i64 sessionId
   2: required list<string> paths
@@ -395,6 +404,8 @@ service TSIService {
 
   TSExecuteStatementResp executeRawDataQuery(1:TSRawDataQueryReq req);
 
+  TSExecuteStatementResp executeLastDataQuery(1:TSLastDataQueryReq req);
+
   i64 requestStatementId(1:i64 sessionId);
 
   TSStatus createDeviceTemplate(1:TSCreateDeviceTemplateReq req);