You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ch...@apache.org on 2021/06/03 08:40:38 UTC
[iotdb] branch master updated: add query last data interface (#3219)
This is an automated email from the ASF dual-hosted git repository.
chaow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 17f10e3 add query last data interface (#3219)
17f10e3 is described below
commit 17f10e30db3dd2f2d76ba69596d179c1f4f873d6
Author: Hang Ji <55...@users.noreply.github.com>
AuthorDate: Thu Jun 3 16:40:10 2021 +0800
add query last data interface (#3219)
Co-authored-by: Xiangwei Wei <34...@users.noreply.github.com>
---
.../apache/iotdb/cluster/query/ClusterPlanner.java | 16 ++++++
docker/src/main/Dockerfile-cluster | 5 +-
docker/src/main/Dockerfile-single | 5 +-
.../main/java/org/apache/iotdb/SessionExample.java | 15 +++++
.../main/java/org/apache/iotdb/db/qp/Planner.java | 15 +++++
.../iotdb/db/qp/strategy/LogicalGenerator.java | 29 ++++++++++
.../org/apache/iotdb/db/service/TSServiceImpl.java | 32 +++++++++++
.../java/org/apache/iotdb/db/qp/PlannerTest.java | 16 ++++++
.../java/org/apache/iotdb/session/Session.java | 38 ++++++++++++
.../apache/iotdb/session/SessionConnection.java | 39 +++++++++++++
.../iotdb/session/IoTDBSessionComplexIT.java | 33 +++++++++++
.../test/java/org/apache/iotdb/db/sql/Cases.java | 67 ++++++++++++++++++++++
thrift/src/main/thrift/rpc.thrift | 11 ++++
13 files changed, 317 insertions(+), 4 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanner.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanner.java
index a2469ac..f83a2de 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanner.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanner.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.db.qp.logical.Operator;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.strategy.LogicalChecker;
import org.apache.iotdb.db.qp.strategy.LogicalGenerator;
+import org.apache.iotdb.service.rpc.thrift.TSLastDataQueryReq;
import org.apache.iotdb.service.rpc.thrift.TSRawDataQueryReq;
import java.time.ZoneId;
@@ -60,4 +61,19 @@ public class ClusterPlanner extends Planner {
return new ClusterPhysicalGenerator()
.transformToPhysicalPlan(operator, rawDataQueryReq.fetchSize);
}
+
+ @Override
+ public PhysicalPlan lastDataQueryReqToPhysicalPlan(
+ TSLastDataQueryReq lastDataQueryReq, ZoneId zoneId)
+ throws QueryProcessException, IllegalPathException {
+ // from TSLastDataQueryReq to logical operator
+ Operator operator = LogicalGenerator.generate(lastDataQueryReq, zoneId);
+ // check if there are logical errors
+ LogicalChecker.check(operator);
+ // optimize the logical operator
+ operator = logicalOptimize(operator, lastDataQueryReq.fetchSize);
+ // from logical operator to physical plan
+ return new ClusterPhysicalGenerator()
+ .transformToPhysicalPlan(operator, lastDataQueryReq.fetchSize);
+ }
}
diff --git a/docker/src/main/Dockerfile-cluster b/docker/src/main/Dockerfile-cluster
index 277132b..abc324e 100644
--- a/docker/src/main/Dockerfile-cluster
+++ b/docker/src/main/Dockerfile-cluster
@@ -24,7 +24,7 @@ FROM openjdk:11-jre-slim
ADD distribution/target/apache-iotdb-*-cluster-bin.zip /
RUN apt update \
- && apt install lsof procps unzip -y \
+ && apt install lsof dos2unix procps unzip -y \
&& unzip /apache-iotdb-*-bin.zip -d / \
&& rm /apache-iotdb-*-bin.zip \
&& mv /apache-iotdb-* /iotdb \
@@ -32,7 +32,8 @@ RUN apt update \
&& apt autoremove -y \
&& apt purge --auto-remove -y \
&& apt clean -y
-
+RUN dos2unix /iotdb/sbin/start-node.sh
+RUN dos2unix /iotdb/sbin/../conf/iotdb-env.sh
EXPOSE 6667
EXPOSE 31999
EXPOSE 5555
diff --git a/docker/src/main/Dockerfile-single b/docker/src/main/Dockerfile-single
index c0e00a9..f283987 100644
--- a/docker/src/main/Dockerfile-single
+++ b/docker/src/main/Dockerfile-single
@@ -24,7 +24,7 @@ FROM openjdk:11-jre-slim
ADD distribution/target/apache-iotdb-*-server-bin.zip /
RUN apt update \
- && apt install lsof procps unzip -y \
+ && apt install lsof dos2unix procps unzip -y \
&& unzip /apache-iotdb-*-bin.zip -d / \
&& rm /apache-iotdb-*-bin.zip \
&& mv /apache-iotdb-* /iotdb \
@@ -32,7 +32,8 @@ RUN apt update \
&& apt autoremove -y \
&& apt purge --auto-remove -y \
&& apt clean -y
-
+RUN dos2unix /iotdb/sbin/start-server.sh
+RUN dos2unix /iotdb/sbin/../conf/iotdb-env.sh
EXPOSE 6667
EXPOSE 31999
EXPOSE 5555
diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
index d996d10..50b85d9 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
@@ -77,6 +77,7 @@ public class SessionExample {
query();
queryWithTimeout();
rawDataQuery();
+ lastDataQuery();
queryByIterator();
deleteData();
deleteTimeseries();
@@ -577,6 +578,20 @@ public class SessionExample {
dataSet.closeOperationHandle();
}
+ private static void lastDataQuery() throws IoTDBConnectionException, StatementExecutionException {
+ List<String> paths = new ArrayList<>();
+ paths.add(ROOT_SG1_D1_S1);
+ paths.add(ROOT_SG1_D1_S2);
+ paths.add(ROOT_SG1_D1_S3);
+ SessionDataSet sessionDataSet = session.executeLastDataQuery(paths, 3);
+ System.out.println(sessionDataSet.getColumnNames());
+ sessionDataSet.setFetchSize(1024);
+ while (sessionDataSet.hasNext()) {
+ System.out.println(sessionDataSet.next());
+ }
+ sessionDataSet.closeOperationHandle();
+ }
+
private static void queryByIterator()
throws IoTDBConnectionException, StatementExecutionException {
SessionDataSet dataSet = session.executeQueryStatement("select * from root.sg1.d1");
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/Planner.java b/server/src/main/java/org/apache/iotdb/db/qp/Planner.java
index db4722a..1221fed 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/Planner.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/Planner.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.db.qp.strategy.optimizer.DnfFilterOptimizer;
import org.apache.iotdb.db.qp.strategy.optimizer.MergeSingleFilterOptimizer;
import org.apache.iotdb.db.qp.strategy.optimizer.RemoveNotOptimizer;
import org.apache.iotdb.db.utils.TestOnly;
+import org.apache.iotdb.service.rpc.thrift.TSLastDataQueryReq;
import org.apache.iotdb.service.rpc.thrift.TSRawDataQueryReq;
import java.time.ZoneId;
@@ -75,6 +76,20 @@ public class Planner {
return new PhysicalGenerator().transformToPhysicalPlan(operator, rawDataQueryReq.fetchSize);
}
+ /** convert last data query to physical plan directly */
+ public PhysicalPlan lastDataQueryReqToPhysicalPlan(
+ TSLastDataQueryReq lastDataQueryReq, ZoneId zoneId)
+ throws QueryProcessException, IllegalPathException {
+ // from TSLastDataQueryReq to logical operator
+ Operator operator = LogicalGenerator.generate(lastDataQueryReq, zoneId);
+ // check if there are logical errors
+ LogicalChecker.check(operator);
+ // optimize the logical operator
+ operator = logicalOptimize(operator, lastDataQueryReq.fetchSize);
+ // from logical operator to physical plan
+ return new PhysicalGenerator().transformToPhysicalPlan(operator, lastDataQueryReq.fetchSize);
+ }
+
/**
* given an unoptimized logical operator tree and return a optimized result.
*
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalGenerator.java b/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalGenerator.java
index 51a5e72..0cdfc38 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalGenerator.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.db.qp.sql.SqlBaseLexer;
import org.apache.iotdb.db.qp.sql.SqlBaseParser;
import org.apache.iotdb.db.query.expression.ResultColumn;
import org.apache.iotdb.db.query.expression.unary.TimeSeriesOperand;
+import org.apache.iotdb.service.rpc.thrift.TSLastDataQueryReq;
import org.apache.iotdb.service.rpc.thrift.TSRawDataQueryReq;
import org.antlr.v4.runtime.CharStream;
@@ -123,5 +124,33 @@ public class LogicalGenerator {
return queryOp;
}
+ public static Operator generate(TSLastDataQueryReq req, ZoneId zoneId)
+ throws IllegalPathException {
+ // construct query operator and set its global time filter
+ SelectOperator selectOp = new SelectOperator(SQLConstant.TOK_SELECT, zoneId);
+ FromOperator fromOp = new FromOperator(SQLConstant.TOK_FROM);
+ QueryOperator queryOp = new QueryOperator(SQLConstant.TOK_QUERY);
+
+ selectOp.addResultColumn(new ResultColumn(new TimeSeriesOperand(new PartialPath(""))));
+ selectOp.markAsLastQuery();
+
+ for (String p : req.getPaths()) {
+ PartialPath path = new PartialPath(p);
+ fromOp.addPrefixTablePath(path);
+ }
+
+ queryOp.setSelectOperator(selectOp);
+ queryOp.setFromOperator(fromOp);
+
+ PartialPath timePath = new PartialPath(TIME);
+
+ BasicFunctionOperator basicFunctionOperator =
+ new BasicFunctionOperator(
+ SQLConstant.GREATERTHANOREQUALTO, timePath, Long.toString(req.getTime()));
+ queryOp.setFilterOperator(basicFunctionOperator);
+
+ return queryOp;
+ }
+
private LogicalGenerator() {}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index fc8c344..f09454e 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -117,6 +117,7 @@ import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertTabletReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertTabletsReq;
+import org.apache.iotdb.service.rpc.thrift.TSLastDataQueryReq;
import org.apache.iotdb.service.rpc.thrift.TSOpenSessionReq;
import org.apache.iotdb.service.rpc.thrift.TSOpenSessionResp;
import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
@@ -741,6 +742,37 @@ public class TSServiceImpl implements TSIService.Iface {
}
}
+ @Override
+ public TSExecuteStatementResp executeLastDataQuery(TSLastDataQueryReq req) throws TException {
+ try {
+ if (!checkLogin(req.getSessionId())) {
+ return RpcUtils.getTSExecuteStatementResp(TSStatusCode.NOT_LOGIN_ERROR);
+ }
+
+ PhysicalPlan physicalPlan =
+ processor.lastDataQueryReqToPhysicalPlan(req, sessionIdZoneIdMap.get(req.getSessionId()));
+ return physicalPlan.isQuery()
+ ? internalExecuteQueryStatement(
+ "",
+ req.statementId,
+ physicalPlan,
+ req.fetchSize,
+ config.getQueryTimeoutThreshold(),
+ sessionIdUsernameMap.get(req.getSessionId()),
+ req.isEnableRedirectQuery())
+ : RpcUtils.getTSExecuteStatementResp(
+ TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is not a query statement.");
+ } catch (InterruptedException e) {
+ LOGGER.error(INFO_INTERRUPT_ERROR, req, e);
+ Thread.currentThread().interrupt();
+ return RpcUtils.getTSExecuteStatementResp(
+ onQueryException(e, "executing lastDataQueryReqToPhysicalPlan"));
+ } catch (Exception e) {
+ return RpcUtils.getTSExecuteStatementResp(
+ onQueryException(e, "executing lastDataQueryReqToPhysicalPlan"));
+ }
+ }
+
/**
* @param plan must be a plan for Query: FillQueryPlan, AggregationPlan, GroupByTimePlan, UDFPlan,
* some AuthorPlan
diff --git a/server/src/test/java/org/apache/iotdb/db/qp/PlannerTest.java b/server/src/test/java/org/apache/iotdb/db/qp/PlannerTest.java
index 508ebe4..e1e5386 100644
--- a/server/src/test/java/org/apache/iotdb/db/qp/PlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/qp/PlannerTest.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.service.rpc.thrift.TSLastDataQueryReq;
import org.apache.iotdb.service.rpc.thrift.TSRawDataQueryReq;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
@@ -253,4 +254,19 @@ public class PlannerTest {
assertEquals(paths.get(0), physicalPlan.getPaths().get(0).getFullPath());
assertEquals(paths.get(1), physicalPlan.getPaths().get(1).getFullPath());
}
+
+ @Test
+ public void lastDataQueryReqToPhysicalPlanTest()
+ throws QueryProcessException, IllegalPathException {
+ TSLastDataQueryReq tsLastDataQueryReq = new TSLastDataQueryReq();
+ List<String> paths = new ArrayList<>();
+ paths.add("root.vehicle.device1.sensor1");
+ tsLastDataQueryReq.setPaths(paths);
+ tsLastDataQueryReq.setTime(0);
+ tsLastDataQueryReq.setFetchSize(1000);
+ PhysicalPlan physicalPlan =
+ processor.lastDataQueryReqToPhysicalPlan(tsLastDataQueryReq, ZoneId.of("Asia/Shanghai"));
+ assertEquals(OperatorType.LAST, physicalPlan.getOperatorType());
+ assertEquals(paths.get(0), physicalPlan.getPaths().get(0).getFullPath());
+ }
}
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index 6fc4ab4..f9e4c9c 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -590,6 +590,44 @@ public class Session {
}
/**
+ * only: select last status from root.ln.d1.s1 where time >= 1621326244168;
+ *
+ * @param paths timeSeries eg. root.ln.d1.s1,root.ln.d1.s2
+ * @param LastTime get the last data, whose timestamp greater than or equal LastTime eg.
+ * 1621326244168
+ */
+ public SessionDataSet executeLastDataQuery(List<String> paths, long LastTime)
+ throws StatementExecutionException, IoTDBConnectionException {
+ try {
+ return defaultSessionConnection.executeLastDataQuery(paths, LastTime);
+ } catch (RedirectException e) {
+ handleQueryRedirection(e.getEndPoint());
+ if (enableQueryRedirection) {
+ // retry
+ try {
+ return defaultSessionConnection.executeLastDataQuery(paths, LastTime);
+ } catch (RedirectException redirectException) {
+ logger.error("redirect twice", redirectException);
+ throw new StatementExecutionException("redirect twice, please try again.");
+ }
+ } else {
+ throw new StatementExecutionException(MSG_DONOT_ENABLE_REDIRECT);
+ }
+ }
+ }
+
+ /**
+ * query eg. select last status from root.ln.wf01.wt01; <PrefixPath> + <suffixPath> = <TimeSeries>
+ *
+ * @param paths timeSeries. eg.root.ln.d1.s1,root.ln.d1.s2
+ */
+ public SessionDataSet executeLastDataQuery(List<String> paths)
+ throws StatementExecutionException, IoTDBConnectionException {
+ long time = 0L;
+ return executeLastDataQuery(paths, time);
+ }
+
+ /**
* insert data in one row, if you want to improve your performance, please use insertRecords
* method or insertTablet method
*
diff --git a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index 402523d..361c87b 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -42,6 +42,7 @@ import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertTabletReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertTabletsReq;
+import org.apache.iotdb.service.rpc.thrift.TSLastDataQueryReq;
import org.apache.iotdb.service.rpc.thrift.TSOpenSessionReq;
import org.apache.iotdb.service.rpc.thrift.TSOpenSessionResp;
import org.apache.iotdb.service.rpc.thrift.TSRawDataQueryReq;
@@ -394,6 +395,44 @@ public class SessionConnection {
execResp.isIgnoreTimeStamp());
}
+ protected SessionDataSet executeLastDataQuery(List<String> paths, long time)
+ throws StatementExecutionException, IoTDBConnectionException, RedirectException {
+ TSLastDataQueryReq tsLastDataQueryReq =
+ new TSLastDataQueryReq(sessionId, paths, time, statementId);
+ tsLastDataQueryReq.setFetchSize(session.fetchSize);
+ tsLastDataQueryReq.setEnableRedirectQuery(enableRedirect);
+ TSExecuteStatementResp tsExecuteStatementResp;
+ try {
+ tsExecuteStatementResp = client.executeLastDataQuery(tsLastDataQueryReq);
+ RpcUtils.verifySuccessWithRedirection(tsExecuteStatementResp.getStatus());
+ } catch (TException e) {
+ if (reconnect()) {
+ try {
+ tsLastDataQueryReq.setSessionId(sessionId);
+ tsLastDataQueryReq.setStatementId(statementId);
+ tsExecuteStatementResp = client.executeLastDataQuery(tsLastDataQueryReq);
+ } catch (TException tException) {
+ throw new IoTDBConnectionException(tException);
+ }
+ } else {
+ throw new IoTDBConnectionException(MSG_RECONNECTION_FAIL);
+ }
+ }
+
+ RpcUtils.verifySuccess(tsExecuteStatementResp.getStatus());
+ return new SessionDataSet(
+ "",
+ tsExecuteStatementResp.getColumns(),
+ tsExecuteStatementResp.getDataTypeList(),
+ tsExecuteStatementResp.columnNameIndexMap,
+ tsExecuteStatementResp.getQueryId(),
+ statementId,
+ client,
+ sessionId,
+ tsExecuteStatementResp.queryDataSet,
+ tsExecuteStatementResp.isIgnoreTimeStamp());
+ }
+
protected void insertRecord(TSInsertRecordReq request)
throws IoTDBConnectionException, StatementExecutionException, RedirectException {
request.setSessionId(sessionId);
diff --git a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionComplexIT.java b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionComplexIT.java
index 281c669..6b1234c 100644
--- a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionComplexIT.java
+++ b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionComplexIT.java
@@ -292,6 +292,20 @@ public class IoTDBSessionComplexIT {
}
@Test
+ public void testLastDataQuery() throws IoTDBConnectionException, StatementExecutionException {
+ session = new Session("127.0.0.1", 6667, "root", "root");
+ session.open();
+
+ session.setStorageGroup("root.sg1");
+
+ createTimeseries();
+
+ insertRecords();
+
+ lastDataQuery();
+ }
+
+ @Test
public void test()
throws ClassNotFoundException, SQLException, IoTDBConnectionException,
StatementExecutionException {
@@ -594,6 +608,25 @@ public class IoTDBSessionComplexIT {
sessionDataSet.closeOperationHandle();
}
+ private void lastDataQuery() throws StatementExecutionException, IoTDBConnectionException {
+ List<String> paths = new ArrayList<>();
+
+ paths.add("root.sg1.d1.s1");
+ paths.add("root.sg1.d2.s1");
+
+ SessionDataSet sessionDataSet = session.executeLastDataQuery(paths);
+ sessionDataSet.setFetchSize(1024);
+
+ int count = 0;
+ while (sessionDataSet.hasNext()) {
+ count++;
+ List<Field> fields = sessionDataSet.next().getFields();
+ Assert.assertEquals("[root.sg1.d2.s1, 1]", fields.toString());
+ }
+ Assert.assertEquals(1, count);
+ sessionDataSet.closeOperationHandle();
+ }
+
private void insertTablet(String deviceId)
throws IoTDBConnectionException, StatementExecutionException {
diff --git a/testcontainer/src/test/java/org/apache/iotdb/db/sql/Cases.java b/testcontainer/src/test/java/org/apache/iotdb/db/sql/Cases.java
index 17e4f53..67d355c 100644
--- a/testcontainer/src/test/java/org/apache/iotdb/db/sql/Cases.java
+++ b/testcontainer/src/test/java/org/apache/iotdb/db/sql/Cases.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.session.SessionDataSet;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.common.Field;
import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.apache.iotdb.tsfile.write.record.Tablet;
import org.apache.iotdb.tsfile.write.schema.VectorMeasurementSchema;
@@ -245,4 +246,70 @@ public abstract class Cases {
Assert.assertEquals(10, next.getFields().get(0).getLongV());
Assert.assertEquals(10, next.getFields().get(1).getLongV());
}
+
+ @Test
+ public void clusterLastQueryTest() throws IoTDBConnectionException, StatementExecutionException {
+
+ session.setStorageGroup("root.sg1");
+ session.createTimeseries(
+ "root.sg1.d1.s1", TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY);
+ session.createTimeseries(
+ "root.sg1.d2.s1", TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY);
+
+ insertRecords();
+
+ List<String> paths = new ArrayList<>();
+
+ paths.add("root.sg1.d1.s1");
+ paths.add("root.sg1.d2.s1");
+
+ SessionDataSet sessionDataSet = session.executeLastDataQuery(paths);
+ sessionDataSet.setFetchSize(1024);
+
+ int count = 0;
+ while (sessionDataSet.hasNext()) {
+ count++;
+ List<Field> fields = sessionDataSet.next().getFields();
+ Assert.assertEquals("[root.sg1.d1.s1, 1]", fields.toString());
+ }
+ Assert.assertEquals(1, count);
+ sessionDataSet.closeOperationHandle();
+ }
+
+ private void insertRecords() throws IoTDBConnectionException, StatementExecutionException {
+ String deviceId = "root.sg1.d1";
+ List<String> measurements = new ArrayList<>();
+ measurements.add("s1");
+ List<String> deviceIds = new ArrayList<>();
+ List<List<String>> measurementsList = new ArrayList<>();
+ List<List<Object>> valuesList = new ArrayList<>();
+ List<Long> timestamps = new ArrayList<>();
+ List<List<TSDataType>> typesList = new ArrayList<>();
+
+ for (long time = 0; time < 500; time++) {
+ List<Object> values = new ArrayList<>();
+ List<TSDataType> types = new ArrayList<>();
+ values.add(1L);
+ values.add(2L);
+ values.add(3L);
+ types.add(TSDataType.INT64);
+ types.add(TSDataType.INT64);
+ types.add(TSDataType.INT64);
+
+ deviceIds.add(deviceId);
+ measurementsList.add(measurements);
+ valuesList.add(values);
+ typesList.add(types);
+ timestamps.add(time);
+ if (time != 0 && time % 100 == 0) {
+ session.insertRecords(deviceIds, timestamps, measurementsList, typesList, valuesList);
+ deviceIds.clear();
+ measurementsList.clear();
+ valuesList.clear();
+ timestamps.clear();
+ }
+ }
+
+ session.insertRecords(deviceIds, timestamps, measurementsList, typesList, valuesList);
+ }
}
diff --git a/thrift/src/main/thrift/rpc.thrift b/thrift/src/main/thrift/rpc.thrift
index 0b64889..4010e50 100644
--- a/thrift/src/main/thrift/rpc.thrift
+++ b/thrift/src/main/thrift/rpc.thrift
@@ -291,6 +291,15 @@ struct TSRawDataQueryReq {
7: optional bool enableRedirectQuery;
}
+struct TSLastDataQueryReq {
+ 1: required i64 sessionId
+ 2: required list<string> paths
+ 3: optional i32 fetchSize
+ 4: required i64 time
+ 5: required i64 statementId
+ 6: optional bool enableRedirectQuery;
+}
+
struct TSCreateMultiTimeseriesReq {
1: required i64 sessionId
2: required list<string> paths
@@ -395,6 +404,8 @@ service TSIService {
TSExecuteStatementResp executeRawDataQuery(1:TSRawDataQueryReq req);
+ TSExecuteStatementResp executeLastDataQuery(1:TSLastDataQueryReq req);
+
i64 requestStatementId(1:i64 sessionId);
TSStatus createDeviceTemplate(1:TSCreateDeviceTemplateReq req);