You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/05/16 12:17:19 UTC
[iotdb] 02/02: Add session interface
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch caLastOpt
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit e037bc853b5a4497b804edc4bffa22c1a60b503e
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Tue May 16 20:17:06 2023 +0800
Add session interface
---
.../java/org/apache/iotdb/isession/ISession.java | 3 ++
.../apache/iotdb/isession/pool/ISessionPool.java | 4 ++
.../java/org/apache/iotdb/session/Session.java | 31 ++++++++++++++
.../apache/iotdb/session/SessionConnection.java | 49 ++++++++++++++++++++++
.../org/apache/iotdb/session/pool/SessionPool.java | 24 +++++++++++
5 files changed, 111 insertions(+)
diff --git a/isession/src/main/java/org/apache/iotdb/isession/ISession.java b/isession/src/main/java/org/apache/iotdb/isession/ISession.java
index 4095afe368..2543512c1b 100644
--- a/isession/src/main/java/org/apache/iotdb/isession/ISession.java
+++ b/isession/src/main/java/org/apache/iotdb/isession/ISession.java
@@ -165,6 +165,9 @@ public interface ISession extends AutoCloseable {
SessionDataSet executeLastDataQuery(List<String> paths)
throws StatementExecutionException, IoTDBConnectionException;
+ SessionDataSet executeLastDataQueryForOneDevice(String db, String device, List<String> sensors)
+ throws StatementExecutionException, IoTDBConnectionException;
+
SessionDataSet executeAggregationQuery(List<String> paths, List<TAggregationType> aggregations)
throws StatementExecutionException, IoTDBConnectionException;
diff --git a/isession/src/main/java/org/apache/iotdb/isession/pool/ISessionPool.java b/isession/src/main/java/org/apache/iotdb/isession/pool/ISessionPool.java
index 2f5c017540..8a54038999 100644
--- a/isession/src/main/java/org/apache/iotdb/isession/pool/ISessionPool.java
+++ b/isession/src/main/java/org/apache/iotdb/isession/pool/ISessionPool.java
@@ -421,6 +421,10 @@ public interface ISessionPool {
SessionDataSetWrapper executeLastDataQuery(List<String> paths)
throws StatementExecutionException, IoTDBConnectionException;
+ SessionDataSetWrapper executeLastDataQueryForOneDevice(
+ String db, String device, List<String> sensors)
+ throws StatementExecutionException, IoTDBConnectionException;
+
SessionDataSetWrapper executeAggregationQuery(
List<String> paths, List<TAggregationType> aggregations)
throws StatementExecutionException, IoTDBConnectionException;
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index 1ed59f5d0d..1402b7040b 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -64,6 +64,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.BitMap;
+import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.record.Tablet;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
@@ -861,6 +862,36 @@ public class Session implements ISession {
return executeLastDataQuery(paths, time, queryTimeoutInMs);
}
+ @Override
+ public SessionDataSet executeLastDataQueryForOneDevice(
+ String db, String device, List<String> sensors)
+ throws StatementExecutionException, IoTDBConnectionException {
+ Pair<SessionDataSet, TEndPoint> pair;
+ try {
+ pair =
+ getSessionConnection(device)
+ .executeLastDataQueryForOneDevice(db, device, sensors, queryTimeoutInMs);
+ if (pair.right != null) {
+ handleRedirection(device, pair.right);
+ }
+ return pair.left;
+ } catch (IoTDBConnectionException e) {
+ if (enableRedirection
+ && !deviceIdToEndpoint.isEmpty()
+ && deviceIdToEndpoint.get(device) != null) {
+ logger.warn("Session can not connect to {}", deviceIdToEndpoint.get(device));
+ deviceIdToEndpoint.remove(device);
+
+ // reconnect with default connection
+ return defaultSessionConnection.executeLastDataQueryForOneDevice(
+ db, device, sensors, queryTimeoutInMs)
+ .left;
+ } else {
+ throw e;
+ }
+ }
+ }
+
@Override
public SessionDataSet executeAggregationQuery(
List<String> paths, List<TAggregationType> aggregations)
diff --git a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index 6cbd1dc6b2..dea5ebf7b9 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -44,6 +44,7 @@ import org.apache.iotdb.service.rpc.thrift.TSDeleteDataReq;
import org.apache.iotdb.service.rpc.thrift.TSDropSchemaTemplateReq;
import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq;
import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
+import org.apache.iotdb.service.rpc.thrift.TSFastLastDataQueryForOneDeviceReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq;
@@ -63,6 +64,7 @@ import org.apache.iotdb.service.rpc.thrift.TSSetSchemaTemplateReq;
import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneReq;
import org.apache.iotdb.service.rpc.thrift.TSUnsetSchemaTemplateReq;
import org.apache.iotdb.session.util.SessionUtils;
+import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
@@ -451,6 +453,53 @@ public class SessionConnection {
execResp.moreData);
}
+ protected Pair<SessionDataSet, TEndPoint> executeLastDataQueryForOneDevice(
+ String db, String device, List<String> sensors, long timeOut)
+ throws StatementExecutionException, IoTDBConnectionException {
+ TSFastLastDataQueryForOneDeviceReq req =
+ new TSFastLastDataQueryForOneDeviceReq(sessionId, db, device, sensors, statementId);
+ req.setFetchSize(session.fetchSize);
+ req.setEnableRedirectQuery(enableRedirect);
+ req.setLegalPathNodes(true);
+ req.setTimeout(timeOut);
+ TSExecuteStatementResp tsExecuteStatementResp = null;
+ TEndPoint redirectedEndPoint = null;
+ try {
+ tsExecuteStatementResp = client.executeFastLastDataQueryForOneDeviceV2(req);
+ RpcUtils.verifySuccessWithRedirection(tsExecuteStatementResp.getStatus());
+ } catch (RedirectException e) {
+ redirectedEndPoint = e.getEndPoint();
+ } catch (TException e) {
+ if (reconnect()) {
+ try {
+ req.setSessionId(sessionId);
+ req.setStatementId(statementId);
+ tsExecuteStatementResp = client.executeFastLastDataQueryForOneDeviceV2(req);
+ } catch (TException tException) {
+ throw new IoTDBConnectionException(tException);
+ }
+ } else {
+ throw new IoTDBConnectionException(logForReconnectionFailure());
+ }
+ }
+
+ RpcUtils.verifySuccess(tsExecuteStatementResp.getStatus());
+ return new Pair<>(
+ new SessionDataSet(
+ "",
+ tsExecuteStatementResp.getColumns(),
+ tsExecuteStatementResp.getDataTypeList(),
+ tsExecuteStatementResp.columnNameIndexMap,
+ tsExecuteStatementResp.getQueryId(),
+ statementId,
+ client,
+ sessionId,
+ tsExecuteStatementResp.queryResult,
+ tsExecuteStatementResp.isIgnoreTimeStamp(),
+ tsExecuteStatementResp.moreData),
+ redirectedEndPoint);
+ }
+
protected SessionDataSet executeLastDataQuery(List<String> paths, long time, long timeOut)
throws StatementExecutionException, IoTDBConnectionException, RedirectException {
TSLastDataQueryReq tsLastDataQueryReq =
diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index 369f723f2d..110165ca33 100644
--- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -2584,6 +2584,30 @@ public class SessionPool implements ISessionPool {
return null;
}
+ @Override
+ public SessionDataSetWrapper executeLastDataQueryForOneDevice(
+ String db, String device, List<String> sensors)
+ throws StatementExecutionException, IoTDBConnectionException {
+ for (int i = 0; i < RETRY; i++) {
+ ISession session = getSession();
+ try {
+ SessionDataSet resp = session.executeLastDataQueryForOneDevice(db, device, sensors);
+ SessionDataSetWrapper wrapper = new SessionDataSetWrapper(resp, session, this);
+ occupy(session);
+ return wrapper;
+ } catch (IoTDBConnectionException e) {
+ // TException means the connection is broken, remove it and get a new one.
+ logger.warn("executeLastDataQuery failed", e);
+ cleanSessionAndMayThrowConnectionException(session, i, e);
+ } catch (StatementExecutionException | RuntimeException e) {
+ putBack(session);
+ throw e;
+ }
+ }
+ // never go here
+ return null;
+ }
+
@Override
public SessionDataSetWrapper executeAggregationQuery(
List<String> paths, List<TAggregationType> aggregations)