You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/05/16 12:17:19 UTC

[iotdb] 02/02: Add session interface

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch caLastOpt
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit e037bc853b5a4497b804edc4bffa22c1a60b503e
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Tue May 16 20:17:06 2023 +0800

    Add session interface
---
 .../java/org/apache/iotdb/isession/ISession.java   |  3 ++
 .../apache/iotdb/isession/pool/ISessionPool.java   |  4 ++
 .../java/org/apache/iotdb/session/Session.java     | 31 ++++++++++++++
 .../apache/iotdb/session/SessionConnection.java    | 49 ++++++++++++++++++++++
 .../org/apache/iotdb/session/pool/SessionPool.java | 24 +++++++++++
 5 files changed, 111 insertions(+)

diff --git a/isession/src/main/java/org/apache/iotdb/isession/ISession.java b/isession/src/main/java/org/apache/iotdb/isession/ISession.java
index 4095afe368..2543512c1b 100644
--- a/isession/src/main/java/org/apache/iotdb/isession/ISession.java
+++ b/isession/src/main/java/org/apache/iotdb/isession/ISession.java
@@ -165,6 +165,9 @@ public interface ISession extends AutoCloseable {
   SessionDataSet executeLastDataQuery(List<String> paths)
       throws StatementExecutionException, IoTDBConnectionException;
 
+  SessionDataSet executeLastDataQueryForOneDevice(String db, String device, List<String> sensors)
+      throws StatementExecutionException, IoTDBConnectionException;
+
   SessionDataSet executeAggregationQuery(List<String> paths, List<TAggregationType> aggregations)
       throws StatementExecutionException, IoTDBConnectionException;
 
diff --git a/isession/src/main/java/org/apache/iotdb/isession/pool/ISessionPool.java b/isession/src/main/java/org/apache/iotdb/isession/pool/ISessionPool.java
index 2f5c017540..8a54038999 100644
--- a/isession/src/main/java/org/apache/iotdb/isession/pool/ISessionPool.java
+++ b/isession/src/main/java/org/apache/iotdb/isession/pool/ISessionPool.java
@@ -421,6 +421,10 @@ public interface ISessionPool {
   SessionDataSetWrapper executeLastDataQuery(List<String> paths)
       throws StatementExecutionException, IoTDBConnectionException;
 
+  SessionDataSetWrapper executeLastDataQueryForOneDevice(
+      String db, String device, List<String> sensors)
+      throws StatementExecutionException, IoTDBConnectionException;
+
   SessionDataSetWrapper executeAggregationQuery(
       List<String> paths, List<TAggregationType> aggregations)
       throws StatementExecutionException, IoTDBConnectionException;
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index 1ed59f5d0d..1402b7040b 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -64,6 +64,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.BitMap;
+import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.write.record.Tablet;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
@@ -861,6 +862,36 @@ public class Session implements ISession {
     return executeLastDataQuery(paths, time, queryTimeoutInMs);
   }
 
+  @Override
+  public SessionDataSet executeLastDataQueryForOneDevice(
+      String db, String device, List<String> sensors)
+      throws StatementExecutionException, IoTDBConnectionException {
+    Pair<SessionDataSet, TEndPoint> pair;
+    try {
+      pair =
+          getSessionConnection(device)
+              .executeLastDataQueryForOneDevice(db, device, sensors, queryTimeoutInMs);
+      if (pair.right != null) {
+        handleRedirection(device, pair.right);
+      }
+      return pair.left;
+    } catch (IoTDBConnectionException e) {
+      if (enableRedirection
+          && !deviceIdToEndpoint.isEmpty()
+          && deviceIdToEndpoint.get(device) != null) {
+        logger.warn("Session can not connect to {}", deviceIdToEndpoint.get(device));
+        deviceIdToEndpoint.remove(device);
+
+        // reconnect with default connection
+        return defaultSessionConnection.executeLastDataQueryForOneDevice(
+                db, device, sensors, queryTimeoutInMs)
+            .left;
+      } else {
+        throw e;
+      }
+    }
+  }
+
   @Override
   public SessionDataSet executeAggregationQuery(
       List<String> paths, List<TAggregationType> aggregations)
diff --git a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index 6cbd1dc6b2..dea5ebf7b9 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -44,6 +44,7 @@ import org.apache.iotdb.service.rpc.thrift.TSDeleteDataReq;
 import org.apache.iotdb.service.rpc.thrift.TSDropSchemaTemplateReq;
 import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq;
 import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
+import org.apache.iotdb.service.rpc.thrift.TSFastLastDataQueryForOneDeviceReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq;
@@ -63,6 +64,7 @@ import org.apache.iotdb.service.rpc.thrift.TSSetSchemaTemplateReq;
 import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneReq;
 import org.apache.iotdb.service.rpc.thrift.TSUnsetSchemaTemplateReq;
 import org.apache.iotdb.session.util.SessionUtils;
+import org.apache.iotdb.tsfile.utils.Pair;
 
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TBinaryProtocol;
@@ -451,6 +453,53 @@ public class SessionConnection {
         execResp.moreData);
   }
 
+  protected Pair<SessionDataSet, TEndPoint> executeLastDataQueryForOneDevice(
+      String db, String device, List<String> sensors, long timeOut)
+      throws StatementExecutionException, IoTDBConnectionException {
+    TSFastLastDataQueryForOneDeviceReq req =
+        new TSFastLastDataQueryForOneDeviceReq(sessionId, db, device, sensors, statementId);
+    req.setFetchSize(session.fetchSize);
+    req.setEnableRedirectQuery(enableRedirect);
+    req.setLegalPathNodes(true);
+    req.setTimeout(timeOut);
+    TSExecuteStatementResp tsExecuteStatementResp = null;
+    TEndPoint redirectedEndPoint = null;
+    try {
+      tsExecuteStatementResp = client.executeFastLastDataQueryForOneDeviceV2(req);
+      RpcUtils.verifySuccessWithRedirection(tsExecuteStatementResp.getStatus());
+    } catch (RedirectException e) {
+      redirectedEndPoint = e.getEndPoint();
+    } catch (TException e) {
+      if (reconnect()) {
+        try {
+          req.setSessionId(sessionId);
+          req.setStatementId(statementId);
+          tsExecuteStatementResp = client.executeFastLastDataQueryForOneDeviceV2(req);
+        } catch (TException tException) {
+          throw new IoTDBConnectionException(tException);
+        }
+      } else {
+        throw new IoTDBConnectionException(logForReconnectionFailure());
+      }
+    }
+
+    RpcUtils.verifySuccess(tsExecuteStatementResp.getStatus());
+    return new Pair<>(
+        new SessionDataSet(
+            "",
+            tsExecuteStatementResp.getColumns(),
+            tsExecuteStatementResp.getDataTypeList(),
+            tsExecuteStatementResp.columnNameIndexMap,
+            tsExecuteStatementResp.getQueryId(),
+            statementId,
+            client,
+            sessionId,
+            tsExecuteStatementResp.queryResult,
+            tsExecuteStatementResp.isIgnoreTimeStamp(),
+            tsExecuteStatementResp.moreData),
+        redirectedEndPoint);
+  }
+
   protected SessionDataSet executeLastDataQuery(List<String> paths, long time, long timeOut)
       throws StatementExecutionException, IoTDBConnectionException, RedirectException {
     TSLastDataQueryReq tsLastDataQueryReq =
diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index 369f723f2d..110165ca33 100644
--- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -2584,6 +2584,30 @@ public class SessionPool implements ISessionPool {
     return null;
   }
 
+  @Override
+  public SessionDataSetWrapper executeLastDataQueryForOneDevice(
+      String db, String device, List<String> sensors)
+      throws StatementExecutionException, IoTDBConnectionException {
+    for (int i = 0; i < RETRY; i++) {
+      ISession session = getSession();
+      try {
+        SessionDataSet resp = session.executeLastDataQueryForOneDevice(db, device, sensors);
+        SessionDataSetWrapper wrapper = new SessionDataSetWrapper(resp, session, this);
+        occupy(session);
+        return wrapper;
+      } catch (IoTDBConnectionException e) {
+        // TException means the connection is broken, remove it and get a new one.
+        logger.warn("executeLastDataQuery failed", e);
+        cleanSessionAndMayThrowConnectionException(session, i, e);
+      } catch (StatementExecutionException | RuntimeException e) {
+        putBack(session);
+        throw e;
+      }
+    }
+    // never go here
+    return null;
+  }
+
   @Override
   public SessionDataSetWrapper executeAggregationQuery(
       List<String> paths, List<TAggregationType> aggregations)