You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/11/14 07:05:14 UTC
[iotdb] 01/05: change interface name
This is an automated email from the ASF dual-hosted git repository.
hui pushed a commit to branch ml/windowSet
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 38cc84b784d2e458091d7e0798be3aff82d42fe6
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Mon Nov 14 10:07:34 2022 +0800
change interface name
---
.../main/java/org/apache/iotdb/SessionExample.java | 5 ++--
.../iotdb/session/it/IoTDBFetchWindowSetIT.java | 2 +-
.../db/mpp/plan/parser/StatementGenerator.java | 4 ++--
.../service/thrift/impl/ClientRPCServiceImpl.java | 27 ++++++++++------------
.../db/service/thrift/impl/TSServiceImpl.java | 6 ++---
.../apache/iotdb/db/utils/QueryDataSetUtils.java | 2 +-
.../main/java/org/apache/iotdb/rpc/RpcUtils.java | 10 ++++----
.../java/org/apache/iotdb/session/ISession.java | 2 +-
.../java/org/apache/iotdb/session/Session.java | 4 ++--
.../apache/iotdb/session/SessionConnection.java | 24 +++++++++++--------
thrift/src/main/thrift/client.thrift | 8 +++----
11 files changed, 48 insertions(+), 46 deletions(-)
diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
index fbe0c63c8a..8760400c91 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
@@ -74,8 +74,9 @@ public class SessionExample {
List<String> queryPaths = Arrays.asList("root.sg1.d1.s1", "root.sg1.d2.s1");
List<Integer> indexes = Arrays.asList(1, 3, 5, 7);
- List<SessionDataSet> windowSet = session.fetchWindowSet(queryPaths, null, 1, 40, 2, 2, indexes);
- for (SessionDataSet window : windowSet) {
+ List<SessionDataSet> windowBatch =
+ session.fetchWindowBatch(queryPaths, null, 1, 40, 2, 2, indexes);
+ for (SessionDataSet window : windowBatch) {
System.out.println(window.getColumnNames());
while (window.hasNext()) {
System.out.println(window.next());
diff --git a/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBFetchWindowSetIT.java b/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBFetchWindowSetIT.java
index 1e31bd5c83..5c56924b03 100644
--- a/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBFetchWindowSetIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBFetchWindowSetIT.java
@@ -172,7 +172,7 @@ public class IoTDBFetchWindowSetIT {
List<String> queryPaths = Arrays.asList("root.sg1.d1.s1", "root.sg1.d2.s1");
List<Integer> indexes = Arrays.asList(1, 3, 5, 7);
List<SessionDataSet> windowSet =
- session.fetchWindowSet(queryPaths, null, 0, 20, 2, 2, indexes);
+ session.fetchWindowBatch(queryPaths, null, 0, 20, 2, 2, indexes);
Assert.assertEquals(indexes.size(), windowSet.size());
for (SessionDataSet window : windowSet) {
while (window.hasNext()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
index dadd1b3466..d0b5f5167b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
@@ -70,7 +70,7 @@ import org.apache.iotdb.service.rpc.thrift.TSCreateSchemaTemplateReq;
import org.apache.iotdb.service.rpc.thrift.TSCreateTimeseriesReq;
import org.apache.iotdb.service.rpc.thrift.TSDeleteDataReq;
import org.apache.iotdb.service.rpc.thrift.TSDropSchemaTemplateReq;
-import org.apache.iotdb.service.rpc.thrift.TSFetchWindowSetReq;
+import org.apache.iotdb.service.rpc.thrift.TSFetchWindowBatchReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq;
@@ -179,7 +179,7 @@ public class StatementGenerator {
return lastQueryStatement;
}
- public static Statement createStatement(TSFetchWindowSetReq fetchWindowSetReq, ZoneId zoneId)
+ public static Statement createStatement(TSFetchWindowBatchReq fetchWindowSetReq)
throws IllegalPathException {
FetchWindowSetStatement statement = new FetchWindowSetStatement();
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
index 1d7d875999..61363a1d6c 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
@@ -88,8 +88,8 @@ import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataReq;
import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataResp;
import org.apache.iotdb.service.rpc.thrift.TSFetchResultsReq;
import org.apache.iotdb.service.rpc.thrift.TSFetchResultsResp;
-import org.apache.iotdb.service.rpc.thrift.TSFetchWindowSetReq;
-import org.apache.iotdb.service.rpc.thrift.TSFetchWindowSetResp;
+import org.apache.iotdb.service.rpc.thrift.TSFetchWindowBatchReq;
+import org.apache.iotdb.service.rpc.thrift.TSFetchWindowBatchResp;
import org.apache.iotdb.service.rpc.thrift.TSGetTimeZoneResp;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq;
@@ -132,7 +132,6 @@ import static org.apache.iotdb.db.service.basic.ServiceProvider.AUDIT_LOGGER;
import static org.apache.iotdb.db.service.basic.ServiceProvider.CONFIG;
import static org.apache.iotdb.db.service.basic.ServiceProvider.CURRENT_RPC_VERSION;
import static org.apache.iotdb.db.service.basic.ServiceProvider.QUERY_FREQUENCY_RECORDER;
-import static org.apache.iotdb.db.service.basic.ServiceProvider.SESSION_MANAGER;
import static org.apache.iotdb.db.service.basic.ServiceProvider.SLOW_SQL_LOGGER;
import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onIoTDBException;
import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNPEOrUnexpectedException;
@@ -421,19 +420,18 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
}
@Override
- public TSFetchWindowSetResp fetchWindowSet(TSFetchWindowSetReq req) throws TException {
+ public TSFetchWindowBatchResp fetchWindowBatch(TSFetchWindowBatchReq req) throws TException {
if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) {
- return RpcUtils.getTSFetchWindowSetResp(getNotLoggedInStatus());
+ return RpcUtils.getTSFetchWindowBatchResp(getNotLoggedInStatus());
}
long startTime = System.currentTimeMillis();
try {
- Statement s =
- StatementGenerator.createStatement(req, SESSION_MANAGER.getCurrSession().getZoneId());
+ Statement s = StatementGenerator.createStatement(req);
// permission check
TSStatus status = AuthorityChecker.checkAuthority(s, SESSION_MANAGER.getCurrSession());
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- return RpcUtils.getTSFetchWindowSetResp(status);
+ return RpcUtils.getTSFetchWindowBatchResp(status);
}
QUERY_FREQUENCY_RECORDER.incrementAndGet();
@@ -458,14 +456,14 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId);
try (SetThreadName threadName = new SetThreadName(result.queryId.getId())) {
- TSFetchWindowSetResp resp =
- createTSFetchWindowSetResp(queryExecution.getDatasetHeader(), queryId);
- resp.setQueryResultList(QueryDataSetUtils.convertTsBlocksToWindowSet(queryExecution));
+ TSFetchWindowBatchResp resp =
+ createTSFetchWindowBatchResp(queryExecution.getDatasetHeader());
+ resp.setWindowBatch(QueryDataSetUtils.convertTsBlocksToWindowBatch(queryExecution));
return resp;
}
} catch (Exception e) {
// TODO call the coordinator to release query resource
- return RpcUtils.getTSFetchWindowSetResp(
+ return RpcUtils.getTSFetchWindowBatchResp(
onQueryException(e, "\"" + req + "\". " + OperationType.EXECUTE_RAW_DATA_QUERY));
} finally {
addOperationLatency(Operation.EXECUTE_QUERY, startTime);
@@ -1798,12 +1796,11 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
return resp;
}
- private TSFetchWindowSetResp createTSFetchWindowSetResp(DatasetHeader header, long queryId) {
- TSFetchWindowSetResp resp = RpcUtils.getTSFetchWindowSetResp(TSStatusCode.SUCCESS_STATUS);
+ private TSFetchWindowBatchResp createTSFetchWindowBatchResp(DatasetHeader header) {
+ TSFetchWindowBatchResp resp = RpcUtils.getTSFetchWindowBatchResp(TSStatusCode.SUCCESS_STATUS);
resp.setColumnNameList(header.getRespColumns());
resp.setColumnTypeList(header.getRespDataTypeList());
resp.setColumnNameIndexMap(header.getColumnNameIndexMap());
- resp.setQueryId(queryId);
return resp;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
index 9dd837b131..7470898fac 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
@@ -103,8 +103,8 @@ import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataReq;
import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataResp;
import org.apache.iotdb.service.rpc.thrift.TSFetchResultsReq;
import org.apache.iotdb.service.rpc.thrift.TSFetchResultsResp;
-import org.apache.iotdb.service.rpc.thrift.TSFetchWindowSetReq;
-import org.apache.iotdb.service.rpc.thrift.TSFetchWindowSetResp;
+import org.apache.iotdb.service.rpc.thrift.TSFetchWindowBatchReq;
+import org.apache.iotdb.service.rpc.thrift.TSFetchWindowBatchResp;
import org.apache.iotdb.service.rpc.thrift.TSGetTimeZoneResp;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq;
@@ -279,7 +279,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
}
@Override
- public TSFetchWindowSetResp fetchWindowSet(TSFetchWindowSetReq req) throws TException {
+ public TSFetchWindowBatchResp fetchWindowBatch(TSFetchWindowBatchReq req) throws TException {
return null;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
index 9ac3cbd404..d053e4b953 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
@@ -400,7 +400,7 @@ public class QueryDataSetUtils {
return res;
}
- public static List<List<ByteBuffer>> convertTsBlocksToWindowSet(IQueryExecution queryExecution)
+ public static List<List<ByteBuffer>> convertTsBlocksToWindowBatch(IQueryExecution queryExecution)
throws IoTDBException {
List<List<ByteBuffer>> windowSet = new ArrayList<>();
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
index cbedcf75e2..01668c4e24 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.protocol.influxdb.rpc.thrift.InfluxTSStatus;
import org.apache.iotdb.service.rpc.thrift.IClientRPCService;
import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
import org.apache.iotdb.service.rpc.thrift.TSFetchResultsResp;
-import org.apache.iotdb.service.rpc.thrift.TSFetchWindowSetResp;
+import org.apache.iotdb.service.rpc.thrift.TSFetchWindowBatchResp;
import java.lang.reflect.Proxy;
import java.text.SimpleDateFormat;
@@ -226,16 +226,16 @@ public class RpcUtils {
return resp;
}
- public static TSFetchWindowSetResp getTSFetchWindowSetResp(TSStatus status) {
- TSFetchWindowSetResp resp = new TSFetchWindowSetResp();
+ public static TSFetchWindowBatchResp getTSFetchWindowBatchResp(TSStatus status) {
+ TSFetchWindowBatchResp resp = new TSFetchWindowBatchResp();
TSStatus tsStatus = new TSStatus(status);
resp.setStatus(tsStatus);
return resp;
}
- public static TSFetchWindowSetResp getTSFetchWindowSetResp(TSStatusCode tsStatusCode) {
+ public static TSFetchWindowBatchResp getTSFetchWindowBatchResp(TSStatusCode tsStatusCode) {
TSStatus status = getStatus(tsStatusCode);
- return getTSFetchWindowSetResp(status);
+ return getTSFetchWindowBatchResp(status);
}
public static final String DEFAULT_TIME_FORMAT = "default";
diff --git a/session/src/main/java/org/apache/iotdb/session/ISession.java b/session/src/main/java/org/apache/iotdb/session/ISession.java
index 1b140e0d9f..4e8b42a864 100644
--- a/session/src/main/java/org/apache/iotdb/session/ISession.java
+++ b/session/src/main/java/org/apache/iotdb/session/ISession.java
@@ -441,7 +441,7 @@ public interface ISession extends AutoCloseable {
TSConnectionInfoResp fetchAllConnections() throws IoTDBConnectionException;
- List<SessionDataSet> fetchWindowSet(
+ List<SessionDataSet> fetchWindowBatch(
List<String> queryPaths,
String functionName,
long startTime,
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index 68e35b367b..5523f9ea73 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -3264,7 +3264,7 @@ public class Session implements ISession {
}
@Override
- public List<SessionDataSet> fetchWindowSet(
+ public List<SessionDataSet> fetchWindowBatch(
List<String> queryPaths,
String functionName,
long startTime,
@@ -3273,7 +3273,7 @@ public class Session implements ISession {
long slidingStep,
List<Integer> indexes)
throws StatementExecutionException {
- return defaultSessionConnection.fetchWindowSet(
+ return defaultSessionConnection.fetchWindowBatch(
queryPaths, functionName, startTime, endTime, interval, slidingStep, indexes);
}
diff --git a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index 6966b89ae8..1cfa1666c3 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -39,8 +39,8 @@ import org.apache.iotdb.service.rpc.thrift.TSDeleteDataReq;
import org.apache.iotdb.service.rpc.thrift.TSDropSchemaTemplateReq;
import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq;
import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
-import org.apache.iotdb.service.rpc.thrift.TSFetchWindowSetReq;
-import org.apache.iotdb.service.rpc.thrift.TSFetchWindowSetResp;
+import org.apache.iotdb.service.rpc.thrift.TSFetchWindowBatchReq;
+import org.apache.iotdb.service.rpc.thrift.TSFetchWindowBatchResp;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq;
@@ -485,7 +485,7 @@ public class SessionConnection {
tsExecuteStatementResp.isIgnoreTimeStamp());
}
- public List<SessionDataSet> fetchWindowSet(
+ public List<SessionDataSet> fetchWindowBatch(
List<String> queryPaths,
String functionName,
long startTime,
@@ -494,31 +494,35 @@ public class SessionConnection {
long slidingStep,
List<Integer> indexes)
throws StatementExecutionException {
- TSFetchWindowSetReq req =
- new TSFetchWindowSetReq(
+ TSFetchWindowBatchReq req =
+ new TSFetchWindowBatchReq(
sessionId,
statementId,
queryPaths,
new TGroupByTimeParameter(startTime, endTime, interval, slidingStep, indexes));
- TSFetchWindowSetResp resp;
+ if (functionName != null) {
+ req.setFunctionName(functionName);
+ }
+
+ TSFetchWindowBatchResp resp;
try {
- resp = client.fetchWindowSet(req);
+ resp = client.fetchWindowBatch(req);
RpcUtils.verifySuccess(resp.getStatus());
} catch (TException e) {
throw new StatementExecutionException("");
}
List<SessionDataSet> windowSet = new ArrayList<>();
- for (List<ByteBuffer> queryResult : resp.getQueryResultList()) {
+ for (List<ByteBuffer> window : resp.getWindowBatch()) {
SessionDataSet sessionDataSet =
new SessionDataSet(
resp.columnNameList,
resp.columnTypeList,
resp.columnNameIndexMap,
- resp.queryId,
+ -1,
statementId,
sessionId,
- queryResult);
+ window);
windowSet.add(sessionDataSet);
}
return windowSet;
diff --git a/thrift/src/main/thrift/client.thrift b/thrift/src/main/thrift/client.thrift
index 9474d0645b..6d5f6cfc11 100644
--- a/thrift/src/main/thrift/client.thrift
+++ b/thrift/src/main/thrift/client.thrift
@@ -421,7 +421,7 @@ struct TGroupByTimeParameter {
5: required list<i32> indexes
}
-struct TSFetchWindowSetReq {
+struct TSFetchWindowBatchReq {
1: required i64 sessionId
2: required i64 statementId
3: required list<string> queryPaths
@@ -429,12 +429,12 @@ struct TSFetchWindowSetReq {
5: required TGroupByTimeParameter groupByTimeParameter
}
-struct TSFetchWindowSetResp {
+struct TSFetchWindowBatchResp {
1: required common.TSStatus status
2: required list<string> columnNameList
3: required list<string> columnTypeList
4: required map<string, i32> columnNameIndexMap
- 5: required list<TSQueryDataSet> windowSet
+ 5: required list<list<binary>> windowBatch
}
// The sender and receiver need to check some info to confirm validity
@@ -585,5 +585,5 @@ service IClientRPCService {
TSConnectionInfoResp fetchAllConnectionsInfo();
- TSFetchWindowSetResp fetchWindowSet(1:TSFetchWindowSetReq req);
+ TSFetchWindowBatchResp fetchWindowBatch(1:TSFetchWindowBatchReq req);
}
\ No newline at end of file